Moved port from configuration file to API call for connect() and bind()

This commit is contained in:
Robinson 2023-06-26 19:28:55 +02:00
parent bdad03111c
commit 55604c679c
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
34 changed files with 357 additions and 317 deletions

View File

@ -25,22 +25,11 @@ import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.EventPoller import dorkbox.network.aeron.EventPoller
import dorkbox.network.aeron.mediaDriver.ClientConnectionDriver import dorkbox.network.aeron.mediaDriver.ClientConnectionDriver
import dorkbox.network.aeron.mediaDriver.ClientHandshakeDriver import dorkbox.network.aeron.mediaDriver.ClientHandshakeDriver
import dorkbox.network.connection.Connection import dorkbox.network.connection.*
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.EventDispatcher
import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.exceptions.*
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientHandshakeException
import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientShutdownException
import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.exceptions.ServerException
import dorkbox.network.exceptions.TransmitException
import dorkbox.network.handshake.ClientHandshake import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.ping.Ping import dorkbox.network.ping.Ping
import dorkbox.util.sync.CountDownLatch import dorkbox.util.sync.CountDownLatch
@ -166,24 +155,45 @@ open class Client<CONNECTION : Connection>(
} }
/** /**
* The network or IPC address for the client to connect to. * The network address that the client connected to
*
* For a network address, it can be:
* - a network name ("localhost", "loopback", "lo", "bob.example.org")
* - an IP address ("127.0.0.1", "123.123.123.123", "::1")
*
* For the IPC (Inter-Process-Communication) address. it must be:
* - the IPC integer ID, "0x1337c0de", "0x12312312", etc.
*/ */
@Volatile @Volatile
var remoteAddress: InetAddress? = IPv4.LOCALHOST var address: InetAddress? = IPv4.LOCALHOST
private set private set
/** /**
* the remote address, as a string. * The network address that the client connected to, as a string.
*/ */
@Volatile @Volatile
var remoteAddressString: String = "UNKNOWN" var addressString: String = "UNKNOWN"
private set
/**
* The network address that the client connected to, as a pretty string.
*/
@Volatile
var addressPrettyString: String = "UNKNOWN"
private set
/**
* The machine port that the client connected to,
*/
@Volatile
var port: Int = 0
private set
/**
* The default connection reliability type (ie: can the lower-level network stack throw away data that has errors, for example real-time-voice)
*/
@Volatile
var reliable: Boolean = true
private set
/**
* How long (in seconds) will connections wait to connect. 0 will wait indefinitely,
*/
@Volatile
var connectionTimeoutSec: Int = 0
private set private set
private val handshake = ClientHandshake(this, logger) private val handshake = ClientHandshake(this, logger)
@ -200,50 +210,44 @@ open class Client<CONNECTION : Connection>(
} }
/** /**
* Will attempt to connect to the server, with a default 30 second connection timeout and will block until completed. * Will attempt to re-connect to the server, with the settings previously used when calling connect()
*
* Default connection is to localhost
*
* ### For a network address, it can be:
* - a network name ("localhost", "bob.example.org")
* - an IP address ("127.0.0.1", "123.123.123.123", "::1")
* - an InetAddress address
*
* ### For the IPC (Inter-Process-Communication) it must be:
* - `connect()`
* - `connect("")`
* - `connectIpc()`
*
* ### Case does not matter, and "localhost" is the default.
*
* @param remoteAddress The network or if localhost, IPC address for the client to connect to
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely
* @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?).
* *
* @throws IllegalArgumentException if the remote address is invalid * @throws IllegalArgumentException if the remote address is invalid
* @throws ClientTimedOutException if the client is unable to connect in x amount of time * @throws ClientTimedOutException if the client is unable to connect in x amount of time
* @throws ClientRejectedException if the client connection is rejected * @throws ClientRejectedException if the client connection is rejected
* @throws ClientShutdownException if the client connection is shutdown while trying to connect
* @throws ClientException if there are misc errors
*/ */
fun connect( @Suppress("DuplicatedCode")
remoteAddress: InetAddress, suspend fun reconnect() {
connectionTimeoutSec: Int = 30, connect(
reliable: Boolean = true) = runBlocking { remoteAddress = address,
remoteAddressString = addressString,
val remoteAddressString = when (remoteAddress) { remoteAddressPrettyString = addressPrettyString,
is Inet4Address -> IPv4.toString(remoteAddress) port = port,
is Inet6Address -> IPv6.toString(remoteAddress, true) connectionTimeoutSec = connectionTimeoutSec,
else -> throw IllegalArgumentException("Cannot connect to $remoteAddress It is an invalid address type!") reliable = reliable,
} )
}
// Default IPC ports are flipped because they are in the perspective of the SERVER
connect(remoteAddress = remoteAddress, /**
remoteAddressString = remoteAddressString, * Will attempt to connect via IPC to the server, with a default 30 second connection timeout and will block until completed.
remoteAddressPrettyString = remoteAddressString, *
connectionTimeoutSec = connectionTimeoutSec, * ### For the IPC (Inter-Process-Communication) it must be:
reliable = reliable) * - `connectIpc()`
*
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely
*
* @throws ClientTimedOutException if the client is unable to connect in x amount of time
*/
fun connectIpc(connectionTimeoutSec: Int = 30) = runBlocking {
connect(remoteAddress = null, // required!
port = 0,
remoteAddressString = IPC_NAME,
remoteAddressPrettyString = IPC_NAME,
connectionTimeoutSec = connectionTimeoutSec)
} }
// TODO:: the port should be part of the connect function!
/** /**
* Will attempt to connect to the server, with a default 30 second connection timeout and will block until completed. * Will attempt to connect to the server, with a default 30 second connection timeout and will block until completed.
@ -254,25 +258,79 @@ open class Client<CONNECTION : Connection>(
* - a network name ("localhost", "bob.example.org") * - a network name ("localhost", "bob.example.org")
* - an IP address ("127.0.0.1", "123.123.123.123", "::1") * - an IP address ("127.0.0.1", "123.123.123.123", "::1")
* - an InetAddress address * - an InetAddress address
* - if no address is specified, and IPC is disabled in the config, then localhost will be selected * - `connect(LOCALHOST)`
* - `connect("localhost")`
* - `connect("bob.example.org")`
* - `connect("127.0.0.1")`
* - `connect("::1")`
* *
* ### For the IPC (Inter-Process-Communication) it must be: * ### For the IPC (Inter-Process-Communication) it must be:
* - `connect()` (only if ipc is enabled in the configuration) * - `connectIPC()`
* - `connect("")` (only if ipc is enabled in the configuration)
* - `connectIpc()`
* *
* ### Case does not matter, and "localhost" is the default. * ### Case does not matter, and "localhost" is the default.
* *
* @param remoteAddress The network host or ip address * @param remoteAddress The network or if localhost, IPC address for the client to connect to
* @param port The network host port to connect to
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely * @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely
* @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?). * @param reliable true if we want to create a reliable connection, can the lower-level network stack throw away data that has errors, (IE: real-time-voice traffic)
*
* @throws IllegalArgumentException if the remote address is invalid
* @throws ClientTimedOutException if the client is unable to connect in x amount of time
* @throws ClientRejectedException if the client connection is rejected
*/
fun connect(
remoteAddress: InetAddress,
port: Int,
connectionTimeoutSec: Int = 30,
reliable: Boolean = true) = runBlocking {
val remoteAddressString = when (remoteAddress) {
is Inet4Address -> IPv4.toString(remoteAddress)
is Inet6Address -> IPv6.toString(remoteAddress, true)
else -> throw IllegalArgumentException("Cannot connect to $remoteAddress It is an invalid address type!")
}
connect(remoteAddress = remoteAddress,
remoteAddressString = remoteAddressString,
remoteAddressPrettyString = remoteAddressString,
port = port,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable)
}
/**
* Will attempt to connect to the server, with a default 30 second connection timeout and will block until completed.
*
* Default connection is to localhost
*
* ### For a network address, it can be:
* - a network name ("localhost", "bob.example.org")
* - an IP address ("127.0.0.1", "123.123.123.123", "::1")
* - an InetAddress address
* - `connect(LOCALHOST)`
* - `connect("localhost")`
* - `connect("bob.example.org")`
* - `connect("127.0.0.1")`
* - `connect("::1")`
*
* ### For the IPC (Inter-Process-Communication) it must be:
* - `connectIPC()`
*
* ### Case does not matter, and "localhost" is the default.
*
* @param remoteAddress The network host name or ip address
* @param port The network host port to connect to
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely
* @param reliable true if we want to create a reliable connection, can the lower-level network stack throw away data that has errors, (IE: real-time-voice traffic)
* *
* @throws IllegalArgumentException if the remote address is invalid * @throws IllegalArgumentException if the remote address is invalid
* @throws ClientTimedOutException if the client is unable to connect in x amount of time * @throws ClientTimedOutException if the client is unable to connect in x amount of time
* @throws ClientRejectedException if the client connection is rejected * @throws ClientRejectedException if the client connection is rejected
*/ */
fun connect( fun connect(
remoteAddress: String = "", remoteAddress: String,
port: Int,
connectionTimeoutSec: Int = 30, connectionTimeoutSec: Int = 30,
reliable: Boolean = true) { reliable: Boolean = true) {
fun connect(dnsResolveType: ResolvedAddressTypes) = runBlocking { fun connect(dnsResolveType: ResolvedAddressTypes) = runBlocking {
@ -298,6 +356,7 @@ open class Client<CONNECTION : Connection>(
// we check again, because the inetAddress that comes back from DNS, might not be what we expect // we check again, because the inetAddress that comes back from DNS, might not be what we expect
remoteAddressString = remoteAddressAsIp, remoteAddressString = remoteAddressAsIp,
remoteAddressPrettyString = formattedString, remoteAddressPrettyString = formattedString,
port = port,
connectionTimeoutSec = connectionTimeoutSec, connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable) reliable = reliable)
} }
@ -306,6 +365,7 @@ open class Client<CONNECTION : Connection>(
// this is default IPC settings // this is default IPC settings
remoteAddress.isEmpty() && config.enableIpc -> runBlocking { remoteAddress.isEmpty() && config.enableIpc -> runBlocking {
connect(remoteAddress = null, // required! connect(remoteAddress = null, // required!
port = 0,
remoteAddressString = IPC_NAME, remoteAddressString = IPC_NAME,
remoteAddressPrettyString = IPC_NAME, remoteAddressPrettyString = IPC_NAME,
connectionTimeoutSec = connectionTimeoutSec) connectionTimeoutSec = connectionTimeoutSec)
@ -330,17 +390,21 @@ open class Client<CONNECTION : Connection>(
* - a network name ("localhost", "bob.example.org") * - a network name ("localhost", "bob.example.org")
* - an IP address ("127.0.0.1", "123.123.123.123", "::1") * - an IP address ("127.0.0.1", "123.123.123.123", "::1")
* - an InetAddress address * - an InetAddress address
* - `connect()` (same as localhost, but only if ipc is disabled in the configuration)
* - `connect("localhost")`
* - `connect("bob.example.org")`
* - `connect("127.0.0.1")`
* - `connect("::1")`
* *
* ### For the IPC (Inter-Process-Communication) it must be: * ### For the IPC (Inter-Process-Communication) it must be:
* - `connect()` * - `connect()` (only if ipc is enabled in the configuration)
* - `connect("")`
* - `connectIpc()`
* *
* ### Case does not matter, and "localhost" is the default. * ### Case does not matter, and "localhost" is the default.
* *
* @param remoteAddress The network or if localhost for the client to connect to * @param remoteAddress The network or if localhost for the client to connect to
* @param port The network host port to connect to
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely. * @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely.
* @param reliable true if we want to create a reliable connection (for UDP connections, is message loss acceptable?). * @param reliable true if we want to create a reliable connection, can the lower-level network stack throw away data that has errors, (IE: real-time-voice traffic)
* *
* @throws IllegalArgumentException if the remote address is invalid * @throws IllegalArgumentException if the remote address is invalid
* @throws ClientTimedOutException if the client is unable to connect in x amount of time * @throws ClientTimedOutException if the client is unable to connect in x amount of time
@ -353,6 +417,7 @@ open class Client<CONNECTION : Connection>(
remoteAddress: InetAddress? = null, remoteAddress: InetAddress? = null,
remoteAddressString: String, remoteAddressString: String,
remoteAddressPrettyString: String, remoteAddressPrettyString: String,
port: Int = 0,
connectionTimeoutSec: Int = 30, connectionTimeoutSec: Int = 30,
reliable: Boolean = true) reliable: Boolean = true)
{ {
@ -363,15 +428,20 @@ open class Client<CONNECTION : Connection>(
val currentDispatcher = EventDispatcher.getCurrentEvent() val currentDispatcher = EventDispatcher.getCurrentEvent()
if (currentDispatcher != null && currentDispatcher != EventDispatcher.CONNECT) { if (currentDispatcher != null && currentDispatcher != EventDispatcher.CONNECT) {
EventDispatcher.CONNECT.launch { EventDispatcher.CONNECT.launch {
connect(remoteAddress, connect(
remoteAddressString, remoteAddress = remoteAddress,
remoteAddressPrettyString, remoteAddressString = remoteAddressString,
connectionTimeoutSec, remoteAddressPrettyString = remoteAddressPrettyString,
reliable) port = port,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable)
} }
return return
} }
require(port > 0 || remoteAddress == null) { "port must be > 0" }
require(port < 65535) { "port must be < 65535" }
// the lifecycle of a client is the ENDPOINT (measured via the network event poller) and CONNECTION (measure from connection closed) // the lifecycle of a client is the ENDPOINT (measured via the network event poller) and CONNECTION (measure from connection closed)
if (!waitForClose()) { if (!waitForClose()) {
if (endpointIsRunning.value) { if (endpointIsRunning.value) {
@ -388,15 +458,6 @@ open class Client<CONNECTION : Connection>(
connection0 = null connection0 = null
// localhost/loopback IP might not always be 127.0.0.1 or ::1
// will be null if it's IPC
this.remoteAddress = remoteAddress
// will be exactly 'IPC' if it's IPC
// if it's an IP address, it will be the IP address
// if it's a DNS name, the name will be resolved, and it will be DNS (IP)
this.remoteAddressString = remoteAddressString
// only try to connect via IPv4 if we have a network interface that supports it! // only try to connect via IPv4 if we have a network interface that supports it!
if (remoteAddress is Inet4Address && !IPv4.isAvailable) { if (remoteAddress is Inet4Address && !IPv4.isAvailable) {
require(false) { "Unable to connect to the IPv4 address $remoteAddressPrettyString, there are no IPv4 interfaces available!" } require(false) { "Unable to connect to the IPv4 address $remoteAddressPrettyString, there are no IPv4 interfaces available!" }
@ -428,6 +489,19 @@ open class Client<CONNECTION : Connection>(
return return
} }
// localhost/loopback IP might not always be 127.0.0.1 or ::1
// will be null if it's IPC
this.address = remoteAddress
// will be exactly 'IPC' if it's IPC
// if it's an IP address, it will be the IP address
// if it's a DNS name, the name will be resolved, and it will be DNS (IP)
this.addressString = remoteAddressString
this.addressPrettyString = remoteAddressString
this.port = port
this.reliable = reliable
this.connectionTimeoutSec = connectionTimeoutSec
val isSelfMachine = remoteAddress?.isLoopbackAddress == true || remoteAddress == lanAddress val isSelfMachine = remoteAddress?.isLoopbackAddress == true || remoteAddress == lanAddress
@ -493,7 +567,7 @@ open class Client<CONNECTION : Connection>(
autoChangeToIpc = autoChangeToIpc, autoChangeToIpc = autoChangeToIpc,
remoteAddress = remoteAddress, remoteAddress = remoteAddress,
remoteAddressString = remoteAddressString, remoteAddressString = remoteAddressString,
config = config, port = port,
handshakeTimeoutSec = handshakeTimeoutSec, handshakeTimeoutSec = handshakeTimeoutSec,
reliable = reliable, reliable = reliable,
logger = logger logger = logger
@ -568,7 +642,7 @@ open class Client<CONNECTION : Connection>(
} else if (isIPC) { } else if (isIPC) {
"IPC" "IPC"
} else { } else {
remoteAddressPrettyString + ":" + config.port "$remoteAddressPrettyString:$port"
} }
// we timed out. Throw the appropriate exception // we timed out. Throw the appropriate exception
@ -606,13 +680,13 @@ open class Client<CONNECTION : Connection>(
val validateRemoteAddress = if (handshakeConnection.pubSub.isIpc) { val validateRemoteAddress = if (handshakeConnection.pubSub.isIpc) {
PublicKeyValidationState.VALID PublicKeyValidationState.VALID
} else { } else {
crypto.validateRemoteAddress(remoteAddress!!, remoteAddressString, connectionInfo.publicKey) crypto.validateRemoteAddress(address!!, addressString, connectionInfo.publicKey)
} }
if (validateRemoteAddress == PublicKeyValidationState.INVALID) { if (validateRemoteAddress == PublicKeyValidationState.INVALID) {
handshakeConnection.close() handshakeConnection.close()
val exception = ClientRejectedException("Connection to [$remoteAddressString] not allowed! Public key mismatch.") val exception = ClientRejectedException("Connection to [$addressString] not allowed! Public key mismatch.")
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
@ -637,7 +711,7 @@ open class Client<CONNECTION : Connection>(
val exception = if (handshakeConnection.pubSub.isIpc) { val exception = if (handshakeConnection.pubSub.isIpc) {
ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!") ClientRejectedException("[${handshake.connectKey}] Connection to IPC has incorrect class registration details!!")
} else { } else {
ClientRejectedException("[${handshake.connectKey}] Connection to [$remoteAddressString] has incorrect class registration details!!") ClientRejectedException("[${handshake.connectKey}] Connection to [$addressString] has incorrect class registration details!!")
} }
exception.cleanStackTraceInternal() exception.cleanStackTraceInternal()
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
@ -672,13 +746,13 @@ open class Client<CONNECTION : Connection>(
newConnection = connectionFunc(ConnectionParams(uuid, this, clientConnection.connectionInfo, PublicKeyValidationState.VALID)) newConnection = connectionFunc(ConnectionParams(uuid, this, clientConnection.connectionInfo, PublicKeyValidationState.VALID))
} else { } else {
newConnection = connectionFunc(ConnectionParams(uuid, this, clientConnection.connectionInfo, validateRemoteAddress)) newConnection = connectionFunc(ConnectionParams(uuid, this, clientConnection.connectionInfo, validateRemoteAddress))
remoteAddress!! address!!
// NOTE: Client can ALWAYS connect to the server. The server makes the decision if the client can connect or not. // NOTE: Client can ALWAYS connect to the server. The server makes the decision if the client can connect or not.
logger.info { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) adding new signature for [$remoteAddressString] : ${connectionInfo.publicKey.toHexString()}" } logger.info { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) adding new signature for [$addressString] : ${connectionInfo.publicKey.toHexString()}" }
storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey) storage.addRegisteredServerKey(address!!, connectionInfo.publicKey)
} }
connection0 = newConnection connection0 = newConnection
@ -690,14 +764,14 @@ open class Client<CONNECTION : Connection>(
try { try {
handshake.done(handshakeConnection, clientConnection, connectionTimeoutSec, handshakeConnection.details) handshake.done(handshakeConnection, clientConnection, connectionTimeoutSec, handshakeConnection.details)
} catch (e: Exception) { } catch (e: Exception) {
listenerManager.notifyError(ClientHandshakeException("[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] error during handshake", e)) listenerManager.notifyError(ClientHandshakeException("[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$addressString] error during handshake", e))
throw e throw e
} }
// finished with the handshake, so always close these! // finished with the handshake, so always close these!
handshakeConnection.close() handshakeConnection.close()
logger.debug { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] done with handshake." } logger.debug { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$addressString] done with handshake." }
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls // before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
listenerManager.notifyInit(newConnection) listenerManager.notifyInit(newConnection)

View File

@ -301,25 +301,6 @@ abstract class Configuration {
} }
/**
* Specify the UDP port to use. This port is used to establish client-server connections.
*
* When used for the server, this is the subscription port, which will be listening for incoming connections
* When used for the client, this is the publication port, which is what port to connect to when establishing a connection
*
* When the client/server are the SAME machine (by checking if it's loopback, or if the remote address is the lan address), then
* the client will auto-select a random port.
*
* This means that client-pub -> {{network}} -> server-sub
*
* Must be the value of an unsigned short and greater than 0.
*/
var port: Int = 0
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/** /**
* How long a connection must be disconnected before we cleanup the memory associated with it * How long a connection must be disconnected before we cleanup the memory associated with it
*/ */
@ -667,9 +648,6 @@ abstract class Configuration {
} }
} }
require(port > 0) { "configuration controlPort must be > 0" }
require(port < 65535) { "configuration controlPort must be < 65535" }
require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" } require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" }
require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" } require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" }
@ -882,7 +860,6 @@ abstract class Configuration {
config.enableIPv6 = enableIPv6 config.enableIPv6 = enableIPv6
config.enableIpc = enableIpc config.enableIpc = enableIpc
config.enableRemoteSignatureValidation = enableRemoteSignatureValidation config.enableRemoteSignatureValidation = enableRemoteSignatureValidation
config.port = port
config.connectionCloseTimeoutInSeconds = connectionCloseTimeoutInSeconds config.connectionCloseTimeoutInSeconds = connectionCloseTimeoutInSeconds
config.connectionCheckIntervalNanos = connectionCheckIntervalNanos config.connectionCheckIntervalNanos = connectionCheckIntervalNanos
config.connectionExpirationTimoutNanos = connectionExpirationTimoutNanos config.connectionExpirationTimoutNanos = connectionExpirationTimoutNanos
@ -940,7 +917,6 @@ abstract class Configuration {
if (enableIPv6 != other.enableIPv6) return false if (enableIPv6 != other.enableIPv6) return false
if (enableIpc != other.enableIpc) return false if (enableIpc != other.enableIpc) return false
if (enableRemoteSignatureValidation != other.enableRemoteSignatureValidation) return false if (enableRemoteSignatureValidation != other.enableRemoteSignatureValidation) return false
if (port != other.port) return false
if (connectionCloseTimeoutInSeconds != other.connectionCloseTimeoutInSeconds) return false if (connectionCloseTimeoutInSeconds != other.connectionCloseTimeoutInSeconds) return false
if (connectionCheckIntervalNanos != other.connectionCheckIntervalNanos) return false if (connectionCheckIntervalNanos != other.connectionCheckIntervalNanos) return false
if (connectionExpirationTimoutNanos != other.connectionExpirationTimoutNanos) return false if (connectionExpirationTimoutNanos != other.connectionExpirationTimoutNanos) return false
@ -966,7 +942,6 @@ abstract class Configuration {
result = 31 * result + enableIPv6.hashCode() result = 31 * result + enableIPv6.hashCode()
result = 31 * result + enableIpc.hashCode() result = 31 * result + enableIpc.hashCode()
result = 31 * result + enableRemoteSignatureValidation.hashCode() result = 31 * result + enableRemoteSignatureValidation.hashCode()
result = 31 * result + port
result = 31 * result + pingTimeoutSeconds result = 31 * result + pingTimeoutSeconds
result = 31 * result + connectionCloseTimeoutInSeconds result = 31 * result + connectionCloseTimeoutInSeconds
result = 31 * result + connectionCheckIntervalNanos.hashCode() result = 31 * result + connectionCheckIntervalNanos.hashCode()

View File

@ -172,6 +172,13 @@ open class Server<CONNECTION : Connection>(
@Volatile @Volatile
internal lateinit var handshake: ServerHandshake<CONNECTION> internal lateinit var handshake: ServerHandshake<CONNECTION>
/**
* The machine port that the server will listen for connections on
*/
@Volatile
var port: Int = 0
private set
final override fun newException(message: String, cause: Throwable?): Throwable { final override fun newException(message: String, cause: Throwable?): Throwable {
return ServerException(message, cause) return ServerException(message, cause)
} }
@ -182,11 +189,16 @@ open class Server<CONNECTION : Connection>(
/** /**
* Binds the server to AERON configuration * Binds the server to AERON configuration
*
* @param port this is the network port which will be listening for incoming connections
*/ */
@Suppress("DuplicatedCode") @Suppress("DuplicatedCode")
fun bind() = runBlocking { fun bind(port: Int = 0) = runBlocking {
// NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines! // NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines!
require(port > 0 || config.enableIpc) { "port must be > 0" }
require(port < 65535) { "port must be < 65535" }
// the lifecycle of a server is the ENDPOINT (measured via the network event poller) // the lifecycle of a server is the ENDPOINT (measured via the network event poller)
if (endpointIsRunning.value) { if (endpointIsRunning.value) {
listenerManager.notifyError(ServerException("Unable to start, the server is already running!")) listenerManager.notifyError(ServerException("Unable to start, the server is already running!"))
@ -208,8 +220,9 @@ open class Server<CONNECTION : Connection>(
return@runBlocking return@runBlocking
} }
config as ServerConfiguration this@Server.port = port
config as ServerConfiguration
// we are done with initial configuration, now initialize aeron and the general state of this endpoint // we are done with initial configuration, now initialize aeron and the general state of this endpoint

View File

@ -16,7 +16,6 @@
package dorkbox.network.aeron.mediaDriver package dorkbox.network.aeron.mediaDriver
import dorkbox.network.ClientConfiguration
import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronDriver.Companion.getLocalAddressString import dorkbox.network.aeron.AeronDriver.Companion.getLocalAddressString
import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator
@ -56,10 +55,11 @@ internal class ClientHandshakeDriver(
autoChangeToIpc: Boolean, autoChangeToIpc: Boolean,
remoteAddress: InetAddress?, remoteAddress: InetAddress?,
remoteAddressString: String, remoteAddressString: String,
config: ClientConfiguration, port: Int,
handshakeTimeoutSec: Int = 10, handshakeTimeoutSec: Int = 10,
reliable: Boolean, reliable: Boolean,
logger: KLogger): ClientHandshakeDriver { logger: KLogger
): ClientHandshakeDriver {
var isUsingIPC = false var isUsingIPC = false
@ -150,8 +150,7 @@ internal class ClientHandshakeDriver(
handshakeTimeoutSec = handshakeTimeoutSec, handshakeTimeoutSec = handshakeTimeoutSec,
remoteAddress = remoteAddress, remoteAddress = remoteAddress,
remoteAddressString = remoteAddressString, remoteAddressString = remoteAddressString,
portPub = config.port, portPub = port,
portSub = config.port,
sessionIdPub = sessionIdPub, sessionIdPub = sessionIdPub,
streamIdPub = streamIdPub, streamIdPub = streamIdPub,
reliable = reliable, reliable = reliable,
@ -217,7 +216,6 @@ internal class ClientHandshakeDriver(
remoteAddress: InetAddress, remoteAddress: InetAddress,
remoteAddressString: String, remoteAddressString: String,
portPub: Int, portPub: Int,
portSub: Int,
sessionIdPub: Int, sessionIdPub: Int,
streamIdPub: Int, streamIdPub: Int,
reliable: Boolean, reliable: Boolean,
@ -254,20 +252,20 @@ internal class ClientHandshakeDriver(
// Create a subscription the given address and port, using the given stream ID. // Create a subscription the given address and port, using the given stream ID.
var subscription: Subscription? = null var subscription: Subscription? = null
var retryCount = 100 var retryCount = 100
val random = Random() val random = CryptoManagement.secureRandom
val isSameMachine = remoteAddress.isLoopbackAddress || remoteAddress == EndPoint.lanAddress val isSameMachine = remoteAddress.isLoopbackAddress || remoteAddress == EndPoint.lanAddress
var actualPortSub = portSub var portSub = random.nextInt(Short.MAX_VALUE-1025) + 1025
while (subscription == null && retryCount-- > 0) { while (subscription == null && retryCount-- > 0) {
// find a random port to bind to if we are loopback OR if we are the same IP address (not loopback, but to ourselves) // find a random port to bind to if we are loopback OR if we are the same IP address (not loopback, but to ourselves)
if (isSameMachine) { if (isSameMachine) {
// range from 1025-65534 // range from 1025-65534
actualPortSub = random.nextInt(Short.MAX_VALUE-1025) + 1025 portSub = random.nextInt(Short.MAX_VALUE-1025) + 1025
} }
try { try {
val subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, reliable) val subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, reliable)
.endpoint(isRemoteIpv4, localAddressString, actualPortSub) .endpoint(isRemoteIpv4, localAddressString, portSub)
subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo) subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo)
} catch (ignored: Exception) { } catch (ignored: Exception) {
@ -286,7 +284,7 @@ internal class ClientHandshakeDriver(
streamIdPub, streamIdSub, streamIdPub, streamIdSub,
reliable, reliable,
remoteAddress, remoteAddressString, remoteAddress, remoteAddressString,
portPub, actualPortSub) portPub, portSub)
} }
} }

View File

@ -33,8 +33,8 @@ internal class ServerHandshakeDriver(private val aeronDriver: AeronDriver, val s
aeronDriver: AeronDriver, aeronDriver: AeronDriver,
isIpc: Boolean, isIpc: Boolean,
ipInfo: IpInfo, ipInfo: IpInfo,
port: Int,
streamIdSub: Int, sessionIdSub: Int, streamIdSub: Int, sessionIdSub: Int,
isReliable: Boolean,
logInfo: String logInfo: String
): ServerHandshakeDriver { ): ServerHandshakeDriver {
@ -42,16 +42,14 @@ internal class ServerHandshakeDriver(private val aeronDriver: AeronDriver, val s
val subscriptionUri: ChannelUriStringBuilder val subscriptionUri: ChannelUriStringBuilder
if (isIpc) { if (isIpc) {
subscriptionUri = uriHandshake(CommonContext.IPC_MEDIA, isReliable) subscriptionUri = uriHandshake(CommonContext.IPC_MEDIA, true)
info = "$logInfo [$sessionIdSub|$streamIdSub]" info = "$logInfo [$sessionIdSub|$streamIdSub]"
} else { } else {
val port = ipInfo.port
// are we ipv4 or ipv6 or ipv6wildcard? // are we ipv4 or ipv6 or ipv6wildcard?
subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, isReliable) subscriptionUri = uriHandshake(CommonContext.UDP_MEDIA, ipInfo.isReliable)
.endpoint(ipInfo.getAeronPubAddress(ipInfo.isIpv4) + ":" + port) .endpoint(ipInfo.getAeronPubAddress(ipInfo.isIpv4) + ":" + port)
info = "$logInfo ${ipInfo.listenAddressStringPretty} [$sessionIdSub|$streamIdSub|$port] (reliable:$isReliable)" info = "$logInfo ${ipInfo.listenAddressStringPretty} [$sessionIdSub|$streamIdSub|$port] (reliable:${ipInfo.isReliable})"
} }
val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo) val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSub, logInfo)

View File

@ -108,7 +108,6 @@ internal class IpInfo(config: ServerConfiguration) {
val listenAddressString: String val listenAddressString: String
val formattedListenAddressString: String val formattedListenAddressString: String
val listenAddressStringPretty: String val listenAddressStringPretty: String
val port = config.port
val isReliable = config.isReliable val isReliable = config.isReliable
val isIpv4: Boolean val isIpv4: Boolean
@ -230,6 +229,7 @@ internal class IpInfo(config: ServerConfiguration) {
} }
} }
// localhost/loopback IP might not always be 127.0.0.1 or ::1 // localhost/loopback IP might not always be 127.0.0.1 or ::1
// We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this) // We want to listen on BOTH IPv4 and IPv6 (config option lets us configure this)
// val listenIPv4Address: InetAddress? = // val listenIPv4Address: InetAddress? =

View File

@ -21,13 +21,7 @@ import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronDriver.Companion.sessionIdAllocator import dorkbox.network.aeron.AeronDriver.Companion.sessionIdAllocator
import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator
import dorkbox.network.aeron.mediaDriver.ServerConnectionDriver import dorkbox.network.aeron.mediaDriver.ServerConnectionDriver
import dorkbox.network.connection.Connection import dorkbox.network.connection.*
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.EventDispatcher
import dorkbox.network.connection.Handshaker
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.exceptions.AllocationException import dorkbox.network.exceptions.AllocationException
import dorkbox.network.exceptions.ServerHandshakeException import dorkbox.network.exceptions.ServerHandshakeException
import dorkbox.network.exceptions.ServerTimedoutException import dorkbox.network.exceptions.ServerTimedoutException
@ -512,7 +506,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
// the pub/sub do not necessarily have to be the same. They can be ANY port // the pub/sub do not necessarily have to be the same. They can be ANY port
val portPub = message.port val portPub = message.port
val portSub = config.port val portSub = server.port
val logType = if (clientAddress is Inet4Address) { val logType = if (clientAddress is Inet4Address) {
"IPv4" "IPv4"

View File

@ -249,10 +249,10 @@ internal object ServerHandshakePollers {
val driver = ServerHandshakeDriver.build( val driver = ServerHandshakeDriver.build(
aeronDriver = server.aeronDriver, aeronDriver = server.aeronDriver,
isIpc = true, isIpc = true,
port = 0,
ipInfo = server.ipInfo, ipInfo = server.ipInfo,
streamIdSub = config.ipcId, streamIdSub = config.ipcId,
sessionIdSub = AeronDriver.RESERVED_SESSION_ID_INVALID, sessionIdSub = AeronDriver.RESERVED_SESSION_ID_INVALID,
isReliable = true,
logInfo = "HANDSHAKE-IPC" logInfo = "HANDSHAKE-IPC"
) )
@ -302,9 +302,9 @@ internal object ServerHandshakePollers {
aeronDriver = server.aeronDriver, aeronDriver = server.aeronDriver,
isIpc = false, isIpc = false,
ipInfo = server.ipInfo, ipInfo = server.ipInfo,
port = server.port,
streamIdSub = AeronDriver.UDP_HANDSHAKE_STREAM_ID, streamIdSub = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionIdSub = 9, sessionIdSub = 9,
isReliable = isReliable,
logInfo = "HANDSHAKE-IPv4" logInfo = "HANDSHAKE-IPv4"
) )
@ -352,9 +352,9 @@ internal object ServerHandshakePollers {
aeronDriver = server.aeronDriver, aeronDriver = server.aeronDriver,
isIpc = false, isIpc = false,
ipInfo = server.ipInfo, ipInfo = server.ipInfo,
port = server.port,
streamIdSub = AeronDriver.UDP_HANDSHAKE_STREAM_ID, streamIdSub = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionIdSub = 0, sessionIdSub = 0,
isReliable = isReliable,
logInfo = "HANDSHAKE-IPv6" logInfo = "HANDSHAKE-IPv6"
) )
@ -404,9 +404,9 @@ internal object ServerHandshakePollers {
aeronDriver = server.aeronDriver, aeronDriver = server.aeronDriver,
isIpc = false, isIpc = false,
ipInfo = server.ipInfo, ipInfo = server.ipInfo,
port = server.port,
streamIdSub = AeronDriver.UDP_HANDSHAKE_STREAM_ID, streamIdSub = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionIdSub = 0, sessionIdSub = 0,
isReliable = isReliable,
logInfo = "HANDSHAKE-IPv4+6" logInfo = "HANDSHAKE-IPv4+6"
) )

View File

@ -106,7 +106,6 @@ object AeronClient {
fun main(args: Array<String>) { fun main(args: Array<String>) {
val configuration = ClientConfiguration() val configuration = ClientConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.port = 2000
configuration.enableIpc = false configuration.enableIpc = false
configuration.enableIPv4 = true configuration.enableIPv4 = true
@ -138,7 +137,7 @@ object AeronClient {
logger.error("HAS MESSAGE! $message") logger.error("HAS MESSAGE! $message")
} }
client.connect("127.0.0.1") // UDP connection via loopback client.connect("127.0.0.1", 2000) // UDP connection via loopback
// different ones needed // different ones needed

View File

@ -123,7 +123,6 @@ class AeronClientServer {
fun client(remoteAddress: String) { fun client(remoteAddress: String) {
val configuration = ClientConfiguration() val configuration = ClientConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.port = 2000
configuration.enableIpc = false configuration.enableIpc = false
// configuration.enableIPv4 = false // configuration.enableIPv4 = false
@ -155,7 +154,7 @@ class AeronClientServer {
logger.error("HAS MESSAGE! $message") logger.error("HAS MESSAGE! $message")
} }
client.connect(remoteAddress) // UDP connection via loopback client.connect(remoteAddress, 2000) // UDP connection via loopback
// different ones needed // different ones needed
@ -190,7 +189,6 @@ class AeronClientServer {
val configuration = ServerConfiguration() val configuration = ServerConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.listenIpAddress = "*" configuration.listenIpAddress = "*"
configuration.port = 2000
configuration.maxClientCount = 50 configuration.maxClientCount = 50
configuration.enableIpc = false configuration.enableIpc = false
@ -242,7 +240,7 @@ class AeronClientServer {
send("ECHO $message") send("ECHO $message")
} }
server.bind() server.bind(2000)
return server return server
} }

View File

@ -116,7 +116,6 @@ class AeronRmiClientServer {
try { try {
val configuration = ClientConfiguration() val configuration = ClientConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.port = 2000
configuration.enableIpc = false configuration.enableIpc = false
configuration.enableIPv6 = false configuration.enableIPv6 = false
@ -125,7 +124,7 @@ class AeronRmiClientServer {
if (args.contains("client")) { if (args.contains("client")) {
val client = acs.client(0, configuration) val client = acs.client(0, configuration)
client.connect("172.31.73.222") // UDP connection via loopback client.connect("172.31.73.222", 2000) // UDP connection via loopback
runBlocking { runBlocking {
delay(Long.MAX_VALUE) delay(Long.MAX_VALUE)
@ -149,7 +148,7 @@ class AeronRmiClientServer {
clients.forEachIndexed { index, client -> clients.forEachIndexed { index, client ->
// client.connect() // client.connect()
client.connect("172.31.73.222") client.connect("172.31.73.222", 2000)
} }
System.err.println("DONE") System.err.println("DONE")
@ -242,7 +241,6 @@ class AeronRmiClientServer {
val configuration = ServerConfiguration() val configuration = ServerConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.listenIpAddress = "*" configuration.listenIpAddress = "*"
configuration.port = 2000
configuration.maxClientCount = 50 configuration.maxClientCount = 50
configuration.enableIpc = true configuration.enableIpc = true
@ -289,7 +287,7 @@ class AeronRmiClientServer {
throwable.printStackTrace() throwable.printStackTrace()
} }
server.bind() server.bind(2000)
return server return server
} }
} }

View File

@ -90,7 +90,6 @@ object AeronServer {
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.listenIpAddress = "*" configuration.listenIpAddress = "*"
// configuration.listenIpAddress = "127.0.0.1" // configuration.listenIpAddress = "127.0.0.1"
configuration.port = 2000
configuration.maxClientCount = 50 configuration.maxClientCount = 50
// configuration.enableIpc = true // configuration.enableIpc = true
@ -141,7 +140,7 @@ object AeronServer {
send("ECHO $message") send("ECHO $message")
} }
server.bind() server.bind(2000)
runBlocking { runBlocking {
server.waitForClose() server.waitForClose()

View File

@ -21,11 +21,7 @@ import ch.qos.logback.classic.encoder.PatternLayoutEncoder
import ch.qos.logback.classic.joran.JoranConfigurator import ch.qos.logback.classic.joran.JoranConfigurator
import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.ConsoleAppender import ch.qos.logback.core.ConsoleAppender
import dorkbox.network.Client import dorkbox.network.*
import dorkbox.network.ClientConfiguration
import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EndPoint
@ -35,11 +31,7 @@ import dorkbox.storage.Storage
import dorkbox.util.entropy.Entropy import dorkbox.util.entropy.Entropy
import dorkbox.util.entropy.SimpleEntropy import dorkbox.util.entropy.SimpleEntropy
import dorkbox.util.exceptions.InitializationException import dorkbox.util.exceptions.InitializationException
import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.*
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.After import org.junit.After
import org.junit.Assert import org.junit.Assert
import org.junit.Before import org.junit.Before
@ -99,7 +91,6 @@ abstract class BaseTest {
val configuration = ClientConfiguration() val configuration = ClientConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.port = 2000
configuration.enableIpc = false configuration.enableIpc = false
configuration.enableIPv6 = false configuration.enableIPv6 = false
@ -111,7 +102,6 @@ abstract class BaseTest {
fun serverConfig(block: ServerConfiguration.() -> Unit = {}): ServerConfiguration { fun serverConfig(block: ServerConfiguration.() -> Unit = {}): ServerConfiguration {
val configuration = ServerConfiguration() val configuration = ServerConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.port = 2000
configuration.enableIpc = false configuration.enableIpc = false
configuration.enableIPv6 = false configuration.enableIPv6 = false

View File

@ -38,7 +38,7 @@ class ConnectionFilterTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
serverConnectSuccess.value = true serverConnectSuccess.value = true
@ -61,7 +61,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -86,7 +86,7 @@ class ConnectionFilterTest : BaseTest() {
addEndPoint(server) addEndPoint(server)
server.filter(IpSubnetFilterRule(IPv4.WILDCARD, 0)) server.filter(IpSubnetFilterRule(IPv4.WILDCARD, 0))
server.filter(IpSubnetFilterRule(IPv6.WILDCARD, 0)) server.filter(IpSubnetFilterRule(IPv6.WILDCARD, 0))
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
serverConnectSuccess.value = true serverConnectSuccess.value = true
@ -109,7 +109,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -132,7 +132,7 @@ class ConnectionFilterTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.filter(IpSubnetFilterRule("1.1.1.1", 0)) server.filter(IpSubnetFilterRule("1.1.1.1", 0))
server.filter(IpSubnetFilterRule("::1.1.1.1", 0)) // compressed ipv6 server.filter(IpSubnetFilterRule("::1.1.1.1", 0)) // compressed ipv6
@ -157,7 +157,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -187,7 +187,7 @@ class ConnectionFilterTest : BaseTest() {
serverConnectSuccess.value = true serverConnectSuccess.value = true
close() close()
} }
server.bind() server.bind(2000)
} }
run { run {
@ -206,7 +206,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -234,7 +234,7 @@ class ConnectionFilterTest : BaseTest() {
close() close()
} }
server.bind() server.bind(2000)
} }
run { run {
@ -248,7 +248,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -280,7 +280,7 @@ class ConnectionFilterTest : BaseTest() {
close() close()
} }
server.bind() server.bind(2000)
} }
run { run {
@ -301,7 +301,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
stopEndPointsBlocking() stopEndPointsBlocking()
@ -327,7 +327,7 @@ class ConnectionFilterTest : BaseTest() {
close() close()
} }
server.bind() server.bind(2000)
} }
run { run {
@ -342,7 +342,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -371,7 +371,7 @@ class ConnectionFilterTest : BaseTest() {
close() close()
} }
server.bind() server.bind(2000)
} }
run { run {
@ -389,7 +389,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -416,7 +416,7 @@ class ConnectionFilterTest : BaseTest() {
serverConnectSuccess.value = true serverConnectSuccess.value = true
close() close()
} }
server.bind() server.bind(2000)
} }
run { run {
@ -435,7 +435,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -457,7 +457,7 @@ class ConnectionFilterTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.filter { server.filter {
false false
} }
@ -478,7 +478,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e
@ -498,7 +498,7 @@ class ConnectionFilterTest : BaseTest() {
server.filter { server.filter {
false false
} }
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
close() close()
@ -517,7 +517,7 @@ class ConnectionFilterTest : BaseTest() {
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPointsBlocking() stopEndPointsBlocking()
throw e throw e

View File

@ -42,7 +42,7 @@ class DisconnectReconnectTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
logger.error("Disconnecting after 2 seconds.") logger.error("Disconnecting after 2 seconds.")
@ -66,11 +66,11 @@ class DisconnectReconnectTest : BaseTest() {
val count = reconnectCount.getAndIncrement() val count = reconnectCount.getAndIncrement()
if (count < reconnects) { if (count < reconnects) {
logger.error("Reconnecting: $count") logger.error("Reconnecting: $count")
client.connect(LOCALHOST) client.reconnect()
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
latch.await() latch.await()
@ -93,7 +93,7 @@ class DisconnectReconnectTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
} }
run { run {
@ -119,11 +119,11 @@ class DisconnectReconnectTest : BaseTest() {
val count = reconnectCount.getAndIncrement() val count = reconnectCount.getAndIncrement()
if (count < reconnects) { if (count < reconnects) {
logger.error("Reconnecting: $count") logger.error("Reconnecting: $count")
client.connect(LOCALHOST) client.reconnect()
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
@ -165,7 +165,7 @@ class DisconnectReconnectTest : BaseTest() {
val server: Server<Connection> = Server(config) val server: Server<Connection> = Server(config)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
@ -202,11 +202,11 @@ class DisconnectReconnectTest : BaseTest() {
val count = reconnectCount.getAndIncrement() val count = reconnectCount.getAndIncrement()
if (count < reconnects) { if (count < reconnects) {
logger.error("Reconnecting: $count") logger.error("Reconnecting: $count")
client.connect(LOCALHOST) client.reconnect()
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
latch.await() latch.await()
@ -234,7 +234,7 @@ class DisconnectReconnectTest : BaseTest() {
val serverConfiguration = serverConfig() val serverConfiguration = serverConfig()
val server: Server<Connection> = Server(serverConfiguration) val server: Server<Connection> = Server(serverConfiguration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
logger.error("Disconnecting after 2 seconds.") logger.error("Disconnecting after 2 seconds.")
@ -259,11 +259,11 @@ class DisconnectReconnectTest : BaseTest() {
val count = reconnectCount.getAndIncrement() val count = reconnectCount.getAndIncrement()
if (count < reconnects) { if (count < reconnects) {
logger.error("Reconnecting: $count") logger.error("Reconnecting: $count")
client.connect(LOCALHOST) client.reconnect()
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
@ -291,7 +291,7 @@ class DisconnectReconnectTest : BaseTest() {
val server: Server<Connection> = Server(config) val server: Server<Connection> = Server(config)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
logger.error("Disconnecting after 2 seconds.") logger.error("Disconnecting after 2 seconds.")
@ -317,11 +317,11 @@ class DisconnectReconnectTest : BaseTest() {
val count = reconnectCount.getAndIncrement() val count = reconnectCount.getAndIncrement()
if (count < reconnects) { if (count < reconnects) {
logger.error("Reconnecting: $count") logger.error("Reconnecting: $count")
client.connect(LOCALHOST) client.reconnect()
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
latch.await() latch.await()
@ -342,7 +342,7 @@ class DisconnectReconnectTest : BaseTest() {
server = Server(config) server = Server(config)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
logger.error("Connected!") logger.error("Connected!")
@ -366,7 +366,7 @@ class DisconnectReconnectTest : BaseTest() {
stopEndPoints() stopEndPoints()
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
server.close() server.close()

View File

@ -54,7 +54,7 @@ class ErrorLoggerTest : BaseTest() {
throw Exception("server ERROR. SHOULD BE CAUGHT") throw Exception("server ERROR. SHOULD BE CAUGHT")
} }
server.bind() server.bind(2000)
} }
run { run {
@ -74,7 +74,7 @@ class ErrorLoggerTest : BaseTest() {
stopEndPoints() stopEndPoints()
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()

View File

@ -116,7 +116,7 @@ class ListenerTest : BaseTest() {
serverDisconnect.lazySet(true) serverDisconnect.lazySet(true)
} }
server.bind() server.bind(2000)
@ -163,7 +163,7 @@ class ListenerTest : BaseTest() {
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
waitForThreads() waitForThreads()

View File

@ -66,7 +66,6 @@ class MultipleServerTest : BaseTest() {
didReceive.add(AtomicBoolean()) didReceive.add(AtomicBoolean())
val configuration = serverConfig() val configuration = serverConfig()
configuration.port += count
configuration.aeronDirectory = serverAeronDir configuration.aeronDirectory = serverAeronDir
configuration.enableIpc = false configuration.enableIpc = false
@ -85,7 +84,7 @@ class MultipleServerTest : BaseTest() {
} }
} }
server.bind() server.bind(2000 + count)
serverAeronDir = File(configuration.aeronDirectory.toString() + count) serverAeronDir = File(configuration.aeronDirectory.toString() + count)
} }
@ -96,7 +95,6 @@ class MultipleServerTest : BaseTest() {
for (count in 0 until total) { for (count in 0 until total) {
didSend.add(AtomicBoolean()) didSend.add(AtomicBoolean())
val configuration = clientConfig() val configuration = clientConfig()
configuration.port += count
configuration.aeronDirectory = clientAeronDir configuration.aeronDirectory = clientAeronDir
configuration.enableIpc = false configuration.enableIpc = false
@ -111,7 +109,7 @@ class MultipleServerTest : BaseTest() {
send("client_$count") send("client_$count")
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000+count)
} }
waitForThreads() waitForThreads()
@ -194,7 +192,7 @@ class MultipleServerTest : BaseTest() {
} }
} }
server.bind() server.bind(2000+count)
} }
val didSend = mutableListOf<AtomicBoolean>() val didSend = mutableListOf<AtomicBoolean>()
@ -221,7 +219,7 @@ class MultipleServerTest : BaseTest() {
send("client_$count") send("client_$count")
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000+count)
} }
waitForThreads() waitForThreads()

View File

@ -62,7 +62,7 @@ class PingPongTest : BaseTest() {
val server: Server<Connection> = Server(config) val server: Server<Connection> = Server(config)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onError { throwable -> server.onError { throwable ->
fail = "Error during processing. $throwable" fail = "Error during processing. $throwable"
@ -108,7 +108,7 @@ class PingPongTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }

View File

@ -35,7 +35,7 @@ class PingTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
} }
run { run {
@ -62,7 +62,7 @@ class PingTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()

View File

@ -38,7 +38,7 @@ class RoundTripMessageTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onMessage<PingMessage> { ping -> server.onMessage<PingMessage> { ping ->
serverSuccess.value = true serverSuccess.value = true
@ -76,7 +76,7 @@ class RoundTripMessageTest : BaseTest() {
send(ping) send(ping)
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()

View File

@ -36,7 +36,7 @@ class SerializationValidationTest : BaseTest() {
server.onMessage<FinishedCommand> { _ -> server.onMessage<FinishedCommand> { _ ->
stopEndPoints() stopEndPoints()
} }
server.bind() server.bind(2000)
} }
@ -50,7 +50,7 @@ class SerializationValidationTest : BaseTest() {
send(FinishedCommand()) send(FinishedCommand())
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()
@ -72,7 +72,7 @@ class SerializationValidationTest : BaseTest() {
server.onMessage<TestObject> { _ -> server.onMessage<TestObject> { _ ->
stopEndPoints() stopEndPoints()
} }
server.bind() server.bind(2000)
} }
@ -97,7 +97,7 @@ class SerializationValidationTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()
@ -118,7 +118,7 @@ class SerializationValidationTest : BaseTest() {
server.onMessage<TestObject> { _ -> server.onMessage<TestObject> { _ ->
stopEndPoints() stopEndPoints()
} }
server.bind() server.bind(2000)
} }
@ -143,7 +143,7 @@ class SerializationValidationTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()

View File

@ -165,7 +165,6 @@ class SimpleTest : BaseTest() {
run { run {
val configuration = serverConfig() val configuration = serverConfig()
configuration.port = 12312
configuration.enableIPv4 = serverType.ip4 configuration.enableIPv4 = serverType.ip4
configuration.enableIPv6 = serverType.ip6 configuration.enableIPv6 = serverType.ip6
@ -186,12 +185,11 @@ class SimpleTest : BaseTest() {
stopEndPoints() stopEndPoints()
} }
server.bind() server.bind(12312)
} }
run { run {
val configuration = clientConfig() val configuration = clientConfig()
configuration.port = 12312
configuration.enableIPv4 = clientType.ip4 configuration.enableIPv4 = clientType.ip4
configuration.enableIPv6 = clientType.ip6 configuration.enableIPv6 = clientType.ip6
@ -207,15 +205,15 @@ class SimpleTest : BaseTest() {
} }
when (clientType) { when (clientType) {
ConnectType.IPC -> { client.connect() } ConnectType.IPC -> { client.connectIpc() }
ConnectType.IPC4 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IPC4 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IPC6 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IPC6 -> { client.connect(IPv6.LOCALHOST, 12312) }
ConnectType.IPC46 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IPC46 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IPC64 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IPC64 -> { client.connect(IPv6.LOCALHOST, 12312) }
ConnectType.IP4 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IP4 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IP6 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IP6 -> { client.connect(IPv6.LOCALHOST, 12312) }
ConnectType.IP46 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IP46 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IP64 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IP64 -> { client.connect(IPv6.LOCALHOST, 12312) }
} }
} }
@ -232,7 +230,6 @@ class SimpleTest : BaseTest() {
run { run {
val configuration = serverConfig() val configuration = serverConfig()
configuration.port = 12312
configuration.enableIPv4 = serverType.ip4 configuration.enableIPv4 = serverType.ip4
configuration.enableIPv6 = serverType.ip6 configuration.enableIPv6 = serverType.ip6
@ -251,12 +248,11 @@ class SimpleTest : BaseTest() {
close() close()
} }
server.bind() server.bind(12312)
} }
run { run {
val configuration = clientConfig() val configuration = clientConfig()
configuration.port = 12312
configuration.enableIPv4 = clientType.ip4 configuration.enableIPv4 = clientType.ip4
configuration.enableIPv6 = clientType.ip6 configuration.enableIPv6 = clientType.ip6
@ -276,15 +272,15 @@ class SimpleTest : BaseTest() {
} }
when (clientType) { when (clientType) {
ConnectType.IPC -> { client.connect() } ConnectType.IPC -> { client.connectIpc() }
ConnectType.IPC4 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IPC4 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IPC6 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IPC6 -> { client.connect(IPv6.LOCALHOST, 12312) }
ConnectType.IPC46 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IPC46 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IPC64 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IPC64 -> { client.connect(IPv6.LOCALHOST, 12312) }
ConnectType.IP4 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IP4 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IP6 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IP6 -> { client.connect(IPv6.LOCALHOST, 12312) }
ConnectType.IP46 -> { client.connect(IPv4.LOCALHOST) } ConnectType.IP46 -> { client.connect(IPv4.LOCALHOST, 12312) }
ConnectType.IP64 -> { client.connect(IPv6.LOCALHOST) } ConnectType.IP64 -> { client.connect(IPv6.LOCALHOST, 12312) }
} }
} }

View File

@ -40,7 +40,7 @@ class StreamingTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onMessage<ByteArray> { server.onMessage<ByteArray> {
println("received data, shutting down!") println("received data, shutting down!")
@ -63,7 +63,7 @@ class StreamingTest : BaseTest() {
send(hugeData) send(hugeData)
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()

View File

@ -61,7 +61,7 @@ class RmiDelayedInvocationTest : BaseTest() {
addEndPoint(server) addEndPoint(server)
server.rmiGlobal.save(TestObjectImpl(countDownLatch), OBJ_ID) server.rmiGlobal.save(TestObjectImpl(countDownLatch), OBJ_ID)
server.bind() server.bind(2000)
} }
run { run {
@ -106,7 +106,7 @@ class RmiDelayedInvocationTest : BaseTest() {
stopEndPoints() stopEndPoints()
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()

View File

@ -80,11 +80,11 @@ class RmiDuplicateObjectTest : BaseTest() {
private fun doConnect(isIpv4: Boolean, isIpv6: Boolean, runIpv4Connect: Boolean, client: Client<Connection>) { private fun doConnect(isIpv4: Boolean, isIpv6: Boolean, runIpv4Connect: Boolean, client: Client<Connection>) {
when { when {
isIpv4 && isIpv6 && runIpv4Connect -> client.connect(IPv4.LOCALHOST) isIpv4 && isIpv6 && runIpv4Connect -> client.connect(IPv4.LOCALHOST, 2000)
isIpv4 && isIpv6 && !runIpv4Connect -> client.connect(IPv6.LOCALHOST) isIpv4 && isIpv6 && !runIpv4Connect -> client.connect(IPv6.LOCALHOST, 2000)
isIpv4 -> client.connect(IPv4.LOCALHOST) isIpv4 -> client.connect(IPv4.LOCALHOST, 2000)
isIpv6 -> client.connect(IPv6.LOCALHOST) isIpv6 -> client.connect(IPv6.LOCALHOST, 2000)
else -> client.connect(IPv4.LOCALHOST) else -> client.connect(IPv4.LOCALHOST, 2000)
} }
} }
@ -106,7 +106,7 @@ class RmiDuplicateObjectTest : BaseTest() {
val server = Server<Connection>(configuration) val server = Server<Connection>(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onConnect { server.onConnect {
// these are on separate threads (client.init) and this -- there can be race conditions, where the object doesn't exist yet! // these are on separate threads (client.init) and this -- there can be race conditions, where the object doesn't exist yet!

View File

@ -75,7 +75,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
server.bind() server.bind(2000)
} }
@ -116,7 +116,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()
} }
@ -141,7 +141,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
server.bind() server.bind(2000)
} }
@ -181,7 +181,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()
} }
@ -206,7 +206,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
server.bind() server.bind(2000)
} }
@ -240,7 +240,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()
} }
@ -279,7 +279,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
server.bind() server.bind(2000)
} }
@ -300,7 +300,7 @@ class RmiNestedTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
waitForThreads() waitForThreads()
} }

View File

@ -73,11 +73,11 @@ class RmiSimpleActionsTest : BaseTest() {
private fun doConnect(isIpv4: Boolean, isIpv6: Boolean, runIpv4Connect: Boolean, client: Client<Connection>) { private fun doConnect(isIpv4: Boolean, isIpv6: Boolean, runIpv4Connect: Boolean, client: Client<Connection>) {
when { when {
isIpv4 && isIpv6 && runIpv4Connect -> client.connect(IPv4.LOCALHOST) isIpv4 && isIpv6 && runIpv4Connect -> client.connect(IPv4.LOCALHOST, 2000)
isIpv4 && isIpv6 && !runIpv4Connect -> client.connect(IPv6.LOCALHOST) isIpv4 && isIpv6 && !runIpv4Connect -> client.connect(IPv6.LOCALHOST, 2000)
isIpv4 -> client.connect(IPv4.LOCALHOST) isIpv4 -> client.connect(IPv4.LOCALHOST, 2000)
isIpv6 -> client.connect(IPv6.LOCALHOST) isIpv6 -> client.connect(IPv6.LOCALHOST, 2000)
else -> client.connect() else -> client.connectIpc()
} }
} }
@ -97,7 +97,7 @@ class RmiSimpleActionsTest : BaseTest() {
val server = Server<Connection>(configuration) val server = Server<Connection>(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onMessage<MessageWithTestCow> { m -> server.onMessage<MessageWithTestCow> { m ->
server.logger.error("Received finish signal for test for: Client -> Server") server.logger.error("Received finish signal for test for: Client -> Server")

View File

@ -177,7 +177,7 @@ class RmiSimpleTest : BaseTest() {
val server = Server<Connection>(configuration) val server = Server<Connection>(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.rmiGlobal.save(TestCowImpl(44), 44) server.rmiGlobal.save(TestCowImpl(44), 44)
@ -230,19 +230,21 @@ class RmiSimpleTest : BaseTest() {
// this creates a GLOBAL object on the server (instead of a connection specific object) // this creates a GLOBAL object on the server (instead of a connection specific object)
runBlocking { runBlocking {
// fix me! // fix me!
} }
when (clientType) { when (clientType) {
ConnectType.IPC -> client.connect() ConnectType.IPC -> client.connectIpc()
ConnectType.IPC4 -> client.connect(IPv4.LOCALHOST) ConnectType.IPC4 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IPC6 -> client.connect(IPv6.LOCALHOST) ConnectType.IPC6 -> client.connect(IPv6.LOCALHOST, 2000)
ConnectType.IPC46 -> client.connect(IPv4.LOCALHOST) ConnectType.IPC46 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IPC64 -> client.connect(IPv6.LOCALHOST) ConnectType.IPC64 -> client.connect(IPv6.LOCALHOST, 2000)
ConnectType.IP4 -> client.connect(IPv4.LOCALHOST) ConnectType.IP4 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IP6 -> client.connect(IPv6.LOCALHOST) ConnectType.IP6 -> client.connect(IPv6.LOCALHOST, 2000)
ConnectType.IP46 -> client.connect(IPv4.LOCALHOST) ConnectType.IP46 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IP64 -> client.connect(IPv6.LOCALHOST) ConnectType.IP64 -> client.connect(IPv6.LOCALHOST, 2000)
} }
} }
@ -265,7 +267,7 @@ class RmiSimpleTest : BaseTest() {
val server = Server<Connection>(configuration) val server = Server<Connection>(configuration)
addEndPoint(server) addEndPoint(server)
server.bind() server.bind(2000)
server.onMessage<MessageWithTestCow> { m -> server.onMessage<MessageWithTestCow> { m ->
server.logger.error("Received finish signal for test for: Client -> Server") server.logger.error("Received finish signal for test for: Client -> Server")
@ -313,15 +315,15 @@ class RmiSimpleTest : BaseTest() {
} }
when (clientType) { when (clientType) {
ConnectType.IPC -> client.connect() ConnectType.IPC -> client.connectIpc()
ConnectType.IPC4 -> client.connect(IPv4.LOCALHOST) ConnectType.IPC4 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IPC6 -> client.connect(IPv6.LOCALHOST) ConnectType.IPC6 -> client.connect(IPv6.LOCALHOST, 2000)
ConnectType.IPC46 -> client.connect(IPv4.LOCALHOST) ConnectType.IPC46 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IPC64 -> client.connect(IPv6.LOCALHOST) ConnectType.IPC64 -> client.connect(IPv6.LOCALHOST, 2000)
ConnectType.IP4 -> client.connect(IPv4.LOCALHOST) ConnectType.IP4 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IP6 -> client.connect(IPv6.LOCALHOST) ConnectType.IP6 -> client.connect(IPv6.LOCALHOST, 2000)
ConnectType.IP46 -> client.connect(IPv4.LOCALHOST) ConnectType.IP46 -> client.connect(IPv4.LOCALHOST, 2000)
ConnectType.IP64 -> client.connect(IPv6.LOCALHOST) ConnectType.IP64 -> client.connect(IPv6.LOCALHOST, 2000)
} }
} }

View File

@ -65,7 +65,7 @@ class RmiSpamAsyncTest : BaseTest() {
addEndPoint(server) addEndPoint(server)
server.rmiGlobal.save(TestObjectImpl(latch), RMI_ID) server.rmiGlobal.save(TestObjectImpl(latch), RMI_ID)
server.bind() server.bind(2000)
} }
@ -106,7 +106,7 @@ class RmiSpamAsyncTest : BaseTest() {
} }
} }
client.connect(LOCALHOST) client.connect(LOCALHOST, 2000)
} }
latch.await() latch.await()

View File

@ -70,7 +70,7 @@ class RmiSpamSyncSuspendingTest : BaseTest() {
addEndPoint(server) addEndPoint(server)
server.rmiGlobal.save(TestObjectImpl(counter), RMI_ID) server.rmiGlobal.save(TestObjectImpl(counter), RMI_ID)
server.bind() server.bind(2000)
} }
@ -112,7 +112,11 @@ class RmiSpamSyncSuspendingTest : BaseTest() {
stopEndPoints() stopEndPoints()
} }
client.connect() if (configuration.enableIpc) {
client.connectIpc()
} else {
client.connect(LOCALHOST, 2000)
}
} }
waitForThreads() waitForThreads()

View File

@ -69,7 +69,7 @@ class RmiSpamSyncTest : BaseTest() {
addEndPoint(server) addEndPoint(server)
server.rmiGlobal.save(TestObjectImpl(counter), RMI_ID) server.rmiGlobal.save(TestObjectImpl(counter), RMI_ID)
server.bind() server.bind(2000)
} }
@ -111,7 +111,11 @@ class RmiSpamSyncTest : BaseTest() {
stopEndPoints() stopEndPoints()
} }
client.connect() if (configuration.enableIpc) {
client.connectIpc()
} else {
client.connect(LOCALHOST, 2000)
}
} }
waitForThreads() waitForThreads()

View File

@ -1,6 +1,6 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -102,7 +102,7 @@ object TestClient {
close() close()
} }
client.connect(BaseTest.LOCALHOST) client.connect(BaseTest.LOCALHOST, 2000)
runBlocking { runBlocking {
client.waitForClose() client.waitForClose()

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2020 dorkbox, llc * Copyright 2023 dorkbox, llc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -87,7 +87,7 @@ object TestServer {
// } // }
} }
server.bind() server.bind(2000)
runBlocking { runBlocking {
server.waitForClose() server.waitForClose()