Changed Aeron connections from MDC -> "normal" (so the network connections are initialized more like a standard UDP connection would be, instead of the more complicated MDC method

This commit is contained in:
Robinson 2022-08-02 12:11:36 +02:00
parent d5397dde7b
commit 20226468c4
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
29 changed files with 448 additions and 427 deletions

View File

@ -24,17 +24,15 @@ import dorkbox.netUtil.Inet4
import dorkbox.netUtil.Inet6
import dorkbox.netUtil.dnsUtils.ResolvedAddressTypes
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.ClientIpc_MediaDriver
import dorkbox.network.aeron.ClientUdp_MediaDriver
import dorkbox.network.aeron.MediaDriverClient
import dorkbox.network.aeron.MediaDriverConnectInfo
import dorkbox.network.aeron.MediaDriverConnection
import dorkbox.network.aeron.mediaDriver.ClientIpcDriver
import dorkbox.network.aeron.mediaDriver.ClientUdpDriver
import dorkbox.network.aeron.mediaDriver.MediaDriverClient
import dorkbox.network.aeron.mediaDriver.MediaDriverConnectInfo
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.connection.eventLoop
import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientRetryException
@ -234,7 +232,7 @@ open class Client<CONNECTION : Connection>(
/**
* Will attempt to connect to the server via IPC, with a default 30 second connection timeout and will block until completed.
*
* @param ipcPublicationId The IPC publication address for the client to connect to
* @param ipcId The IPC publication address for the client to connect to
* @param ipcSubscriptionId The IPC subscription address for the client to connect to
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely.
*
@ -244,19 +242,12 @@ open class Client<CONNECTION : Connection>(
*/
@Suppress("DuplicatedCode")
fun connectIpc(
ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
ipcId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
connectionTimeoutSec: Int = 30)
{
// Default IPC ports are flipped because they are in the perspective of the SERVER
require(ipcPublicationId != ipcSubscriptionId) { "IPC publication and subscription ports cannot be the same! The must match the server's configuration." }
connect(remoteAddress = null, // required!
remoteAddressString = "IPC",
ipcPublicationId = ipcPublicationId,
ipcSubscriptionId = ipcSubscriptionId,
ipcId = ipcId,
connectionTimeoutSec = connectionTimeoutSec)
}
@ -286,7 +277,6 @@ open class Client<CONNECTION : Connection>(
* @throws ClientTimedOutException if the client is unable to connect in x amount of time
* @throws ClientRejectedException if the client connection is rejected
*/
@Suppress("BlockingMethodInNonBlockingContext")
fun connect(
remoteAddress: String = "",
connectionTimeoutSec: Int = 30,
@ -299,7 +289,7 @@ open class Client<CONNECTION : Connection>(
connectIpc(connectionTimeoutSec = connectionTimeoutSec)
}
IPv4.isPreferred -> {
config.enableIPv4 || IPv4.isPreferred -> {
// we have to check first if it's a valid IPv4 address. If not, maybe it's a DNS lookup
val inet4Address = if (IPv4.isValid(remoteAddress)) {
Inet4.toAddress(remoteAddress)
@ -321,9 +311,9 @@ open class Client<CONNECTION : Connection>(
reliable = reliable)
}
IPv6.isPreferred -> {
config.enableIPv6 || IPv6.isPreferred -> {
// we have to check first if it's a valid IPv6 address. If not, maybe it's a DNS lookup
val inet6Address = if (IPv6.isValid(remoteAddress)) {
var inet6Address = if (IPv6.isValid(remoteAddress)) {
Inet6.toAddress(remoteAddress)
} else {
val client = DnsClient()
@ -337,10 +327,29 @@ open class Client<CONNECTION : Connection>(
throw IllegalArgumentException("The remote address '$remoteAddress' cannot be found.")
}
connect(remoteAddress = inet6Address,
remoteAddressString = remoteAddress,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable)
when (inet6Address) {
IPv4.LOCALHOST -> {
connect(remoteAddress = IPv6.LOCALHOST,
remoteAddressString = IPv6.toString(IPv6.LOCALHOST),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable)
}
is Inet4Address -> {
// we can map the IPv4 address to an IPv6 address.
val address = IPv6.toAddress(IPv4.toString(inet6Address), true)!!
connect(remoteAddress = address,
remoteAddressString = IPv6.toString(address),
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable)
}
else -> {
connect(remoteAddress = inet6Address,
remoteAddressString = remoteAddress,
connectionTimeoutSec = connectionTimeoutSec,
reliable = reliable)
}
}
}
// if there is no preference, then try to connect via IPv4
@ -404,8 +413,7 @@ open class Client<CONNECTION : Connection>(
remoteAddress: InetAddress? = null,
remoteAddressString: String,
// Default IPC ports are flipped because they are in the perspective of the SERVER
ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
ipcId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
connectionTimeoutSec: Int = 30,
reliable: Boolean = true)
{
@ -422,8 +430,8 @@ open class Client<CONNECTION : Connection>(
connection0 = null
// localhost/loopback IP might not always be 127.0.0.1 or ::1
this@Client.remoteAddress = remoteAddress
this@Client.remoteAddressString = remoteAddressString
this.remoteAddress = remoteAddress
this.remoteAddressString = remoteAddressString
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
try {
@ -454,7 +462,7 @@ open class Client<CONNECTION : Connection>(
val autoChangeToIpc =
(config.enableIpc && (remoteAddress == null || remoteAddress.isLoopbackAddress)) || (!config.enableIpc && remoteAddress == null)
val handshake = ClientHandshake(crypto, this@Client, logger)
val handshake = ClientHandshake(crypto, this, logger)
val handshakeTimeout = 5
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
@ -469,6 +477,7 @@ open class Client<CONNECTION : Connection>(
throw exception
}
// we have to pre-set the type (which will ultimately get set to the correct type on success)
var type = ""
val localSessionId = crypto.secureRandom.nextInt() + 1 // this isn't SUPER important, but it helps prevent handshake collisions
@ -482,13 +491,14 @@ open class Client<CONNECTION : Connection>(
}
// MAYBE the server doesn't have IPC enabled? If no, we need to connect via network instead
val ipcConnection = ClientIpc_MediaDriver(
streamIdSubscription = ipcSubscriptionId,
streamId = ipcPublicationId,
val ipcConnection = ClientIpcDriver(
streamId = ipcId,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
localSessionId = localSessionId,
)
type = "${ipcConnection.type} '$remoteAddressString:$ipcId'"
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
try {
ipcConnection.build(aeronDriver, logger)
@ -496,7 +506,7 @@ open class Client<CONNECTION : Connection>(
} catch (e: Exception) {
if (remoteAddress == null) {
// if we specified that we MUST use IPC, then we have to throw the exception, because there is no IPC
val clientException = ClientException("Unable to connect via IPC to server. No address was specified", e)
val clientException = ClientException("Unable to connect via IPC to server. No address specified so fallback is unavailable", e)
ListenerManager.cleanStackTraceInternal(clientException)
throw clientException
}
@ -504,10 +514,10 @@ open class Client<CONNECTION : Connection>(
logger.info { "IPC for loopback enabled, but unable to connect. Retrying with address $remoteAddressString" }
// try a UDP connection instead
val udpConnection = ClientUdp_MediaDriver(
val udpConnection = ClientUdpDriver(
address = remoteAddress,
publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort,
addressString = remoteAddressString,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
localSessionId = localSessionId,
@ -515,17 +525,17 @@ open class Client<CONNECTION : Connection>(
isReliable = reliable
)
type = "${udpConnection.type} '$remoteAddressString:${config.port}'"
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
udpConnection.build(aeronDriver, logger)
udpConnection
}
} else {
val udpConnection = ClientUdp_MediaDriver(
val udpConnection = ClientUdpDriver(
address = remoteAddress!!,
publicationPort = config.publicationPort,
subscriptionPort = config.subscriptionPort,
addressString = remoteAddressString,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
localSessionId = localSessionId,
@ -533,21 +543,23 @@ open class Client<CONNECTION : Connection>(
isReliable = reliable
)
type = "${udpConnection.type} '$remoteAddressString:${config.port}'"
// throws a ConnectTimedOutException if the client cannot connect for any reason to the server handshake ports
udpConnection.build(aeronDriver, logger)
udpConnection
}
type = handshakeConnection.type
logger.info { handshakeConnection.info }
connect0(handshake, handshakeConnection, handshakeTimeout)
success = true
// // finished with the handshake, so always close the connection
// aeronDriver.close(handshakeConnection.subscription)
// aeronDriver.close(handshakeConnection.publication)
// finished with the handshake, so always close the connection publication
// The subscription is RE-USED
handshakeConnection.publication.close()
// once we're done with the connection process, stop trying
break
@ -562,15 +574,17 @@ open class Client<CONNECTION : Connection>(
// we also want to go at SLIGHTLY slower that the aeron driver timeout frequency, this way - if there are connection or handshake issues, the server has the chance to expire the connections.
// If we go TOO FAST, then the server will EVENTUALLY have aeron errors (since it can't keep up per client). We literally
// want to have 1 in-flight handshake, per connection attempt, during the aeron connection timeout
sleep(aeronDriver.driverTimeout()+1)
// ALSO, we want to make sure we DO NOT approach the linger timeout!
sleep(aeronDriver.driverTimeout().coerceAtLeast(TimeUnit.NANOSECONDS.toSeconds(aeronDriver.getLingerNs())))
if (logger.isTraceEnabled) {
logger.trace(e) { "Unable to connect to $type '$remoteAddressString', retrying..." }
logger.trace(e) { "Unable to connect to $type, retrying..." }
} else {
logger.info { "Unable to connect to $type '$remoteAddressString', retrying..." }
logger.info { "Unable to connect to $type, retrying..." }
}
} catch (e: Exception) {
logger.error(e) { "[${handshake.connectKey}] : Un-recoverable error during handshake. Aborting." }
logger.error(e) { "[${handshake.connectKey}] : Un-recoverable error during handshake with $type. Aborting." }
handshake.reset()
listenerManager.notifyError(e)
@ -581,7 +595,7 @@ open class Client<CONNECTION : Connection>(
if (!success) {
if (System.nanoTime() - startTime < timoutInNanos) {
// we timed out. Throw the appropriate exception
val exception = ClientTimedOutException("Unable to connect to the server in $connectionTimeoutSec seconds")
val exception = ClientTimedOutException("Unable to connect to the server at $type in $connectionTimeoutSec seconds")
logger.error(exception) { "Aborting connection attempt to server." }
listenerManager.notifyError(exception)
throw exception
@ -600,7 +614,7 @@ open class Client<CONNECTION : Connection>(
// the handshake process might have to restart this connection process.
private fun connect0(handshake: ClientHandshake<CONNECTION>, handshakeConnection: MediaDriverClient, connectionTimeoutSec: Int) {
// this will block until the connection timeout, and throw an exception if we were unable to connect with the server
val isUsingIPC = handshakeConnection is ClientIpc_MediaDriver
val isUsingIPC = handshakeConnection is ClientIpcDriver
// throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class)
@ -632,11 +646,9 @@ open class Client<CONNECTION : Connection>(
// we are now connected, so we can connect to the NEW client-specific ports
val clientConnection = if (isUsingIPC) {
// Create a subscription at the given address and port, using the given stream ID.
val driver = ClientIpc_MediaDriver(sessionId = connectionInfo.sessionId,
// NOTE: pub/sub must be switched!
streamIdSubscription = connectionInfo.publicationPort,
streamId = connectionInfo.subscriptionPort,
localSessionId = 1 // doesn't matter
val driver = ClientIpcDriver(sessionId = connectionInfo.sessionId,
streamId = connectionInfo.port,
localSessionId = 1 // doesn't matter
)
driver.build(aeronDriver, logger)
@ -646,21 +658,20 @@ open class Client<CONNECTION : Connection>(
MediaDriverConnectInfo(
publication = driver.publication,
subscription = handshakeConnection.subscription,
subscriptionPort = driver.streamIdSubscription,
subscriptionPort = driver.localSessionId,
publicationPort = driver.streamId,
streamId = 0, // this is because with IPC, we have stream sub/pub (which are replaced as port sub/pub)
sessionId = driver.sessionId,
sessionId = driver.remoteSessionId,
isReliable = driver.isReliable,
remoteAddress = null,
remoteAddressString = "ipc"
)
}
else {
val driver = ClientUdp_MediaDriver(
address = (handshakeConnection as ClientUdp_MediaDriver).address,
// NOTE: pub/sub must be switched!
publicationPort = connectionInfo.subscriptionPort,
subscriptionPort = connectionInfo.publicationPort,
val driver = ClientUdpDriver(
address = (handshakeConnection as ClientUdpDriver).address,
addressString = handshakeConnection.addressString,
port = connectionInfo.port, // this is the port that we connect to
streamId = connectionInfo.streamId,
sessionId = connectionInfo.sessionId,
localSessionId = 1, // doesn't matter here
@ -675,16 +686,15 @@ open class Client<CONNECTION : Connection>(
driver.subscription.close()
MediaDriverConnectInfo(
// NOTE: pub/sub must be switched!
subscription = handshakeConnection.subscription,
publication = driver.publication,
subscriptionPort = driver.subscriptionPort,
publicationPort = driver.publicationPort,
subscriptionPort = 0,
publicationPort = driver.port,
streamId = driver.streamId,
sessionId = driver.sessionId,
sessionId = driver.remoteSessionId,
isReliable = driver.isReliable,
remoteAddress = driver.address,
remoteAddressString = MediaDriverConnection.connectionString(driver.address)
remoteAddressString = IP.toString(driver.address)
)
}
@ -784,25 +794,21 @@ open class Client<CONNECTION : Connection>(
logger.debug { "[$aeronLogInfo - ${handshake.connectKey}] Connection (${newConnection.id}) to $remoteAddressString done with handshake." }
// this forces the current thread to WAIT until poll system has started
val mutex = Mutex(locked = true)
// this forces the current thread to WAIT until the network poll system has started
val pollStartupLatch = CountDownLatch(1)
// have to make a new thread to listen for incoming data!
// SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them
// these have to be in two SEPARATE actionDispatch.launch commands.... otherwise...
// if something inside-of notifyConnect is blocking or suspends, then polling will never happen!
actionDispatch.launch {
try {
mutex.unlock()
} catch (ignored: Exception) {}
val networkEventProcessor = Runnable {
pollStartupLatch.countDown()
val pollIdleStrategy = config.pollIdleStrategy.clone()
val pollIdleStrategy = config.pollIdleStrategy.cloneToNormal()
while (!isShutdown()) {
if (!newConnection.isClosedViaAeron()) {
// Polls the AERON media driver subscription channel for incoming messages
val pollCount = newConnection.pollSubscriptions()
val pollCount = newConnection.poll()
// 0 means we idle. >0 means reset and don't idle (because there are likely more poll events)
pollIdleStrategy.idle(pollCount)
@ -812,18 +818,21 @@ open class Client<CONNECTION : Connection>(
// event-loop is required, because we want to run this code AFTER the current coroutine has finished. This prevents
// odd race conditions when a client is restarted. Can only be run from inside another co-routine!
actionDispatch.eventLoop {
// actionDispatch.eventLoop {
// NOTE: We do not shutdown the client!! The client is only closed by explicitly calling `client.close()`
newConnection.close()
}
return@launch
// }
return@Runnable
}
}
}
config.networkInterfaceEventDispatcher.submit(networkEventProcessor)
pollStartupLatch.await()
// these have to be in two SEPARATE "runnables" otherwise...
// if something inside-of listenerManager.notifyConnect is blocking or suspends, then polling will never happen!
actionDispatch.launch {
mutex.withLock { }
lockStepForConnect.getAndSet(null)?.withLock { }
listenerManager.notifyConnect(newConnection)
}
@ -865,7 +874,7 @@ open class Client<CONNECTION : Connection>(
*
* @return true if the message was sent successfully, false if the connection has been closed
*/
suspend fun send(message: Any): Boolean {
fun send(message: Any): Boolean {
val c = connection0
return if (c != null) {
@ -877,17 +886,6 @@ open class Client<CONNECTION : Connection>(
}
}
/**
* Sends a message to the server, if the connection is closed for any reason, this returns false.
*
* @return true if the message was sent successfully, false if the connection has been closed
*/
fun sendBlocking(message: Any): Boolean {
return runBlocking {
send(message)
}
}
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*

View File

@ -75,28 +75,9 @@ class ServerConfiguration : dorkbox.network.Configuration() {
}
/**
* The IPC Publication ID is used to define what ID the server will send data on. The client IPC subscription ID must match this value.
* The IPC ID is used to define what ID the server will receive data on. The client IPC ID must match this value.
*/
var ipcPublicationId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* The IPC Subscription ID is used to define what ID the server will receive data on. The client IPC publication ID must match this value.
*/
var ipcSubscriptionId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Specifies the Java thread that will poll the underlying network for incoming messages
*/
var networkInterfaceEventDispatcher = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Network Event Dispatcher", Thread.currentThread().threadGroup, Thread.NORM_PRIORITY, true))
var ipcId = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB
set(value) {
require(!contextDefined) { errorMessage }
field = value
@ -132,20 +113,6 @@ class ServerConfiguration : dorkbox.network.Configuration() {
}
class ClientConfiguration : dorkbox.network.Configuration() {
/**
* Specify the UDP port to use. This port is used to establish client-server connections, and is from the
* perspective of the server
*
* This means that server-pub -> {{network}} -> client-sub
*
* Must be greater than 0
*/
var publicationPort: Int = 0
set(value) {
require(!contextDefined) { dorkbox.network.Configuration.errorMessage }
field = value
}
/**
* Validates the current configuration
*/
@ -153,9 +120,6 @@ class ClientConfiguration : dorkbox.network.Configuration() {
override fun validate() {
super.validate()
// have to do some basic validation of our configuration
require(publicationPort > 0) { "configuration port must be > 0" }
require(publicationPort < 65535) { "configuration port must be < 65535" }
}
}
@ -167,6 +131,17 @@ abstract class Configuration {
private var alreadyShownTips = false
}
/**
* Specifies the Java thread that will poll the underlying network for incoming messages
*/
var networkInterfaceEventDispatcher: ExecutorService = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Network Event Dispatcher", Thread.currentThread().threadGroup, Thread.NORM_PRIORITY, true)
)
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Enables the ability to use the IPv4 network stack.
*/
@ -210,14 +185,15 @@ abstract class Configuration {
/**
* Specify the UDP subscription port to use. This port is used to establish client-server connections, and is from the
* perspective of the server.
* Specify the UDP port to use. This port is used to establish client-server connections.
* When used for the server, this is the subscription port, which will be listening for incoming connections
* When used for the client, this is the publication port, which is what port to connect to when establishing a connection
*
* This means that client-pub -> {{network}} -> server-sub
*
* Must be greater than 0
* Must be the value of an unsigned short and greater than 0
*/
var subscriptionPort: Int = 0
var port: Int = 0
set(value) {
require(!contextDefined) { errorMessage }
field = value
@ -568,8 +544,8 @@ abstract class Configuration {
}
}
require(subscriptionPort > 0) { "configuration controlPort must be > 0" }
require(subscriptionPort < 65535) { "configuration controlPort must be < 65535" }
require(port > 0) { "configuration controlPort must be > 0" }
require(port < 65535) { "configuration controlPort must be < 65535" }
require(networkMtuSize > 0) { "configuration networkMtuSize must be > 0" }
require(networkMtuSize < 9 * 1024) { "configuration networkMtuSize must be < ${9 * 1024}" }

View File

@ -1,30 +0,0 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("DuplicatedCode")
package dorkbox.network.aeron
import io.aeron.Publication
import io.aeron.Subscription
abstract class MediaDriverClient(subscriptionPort: Int,
streamId: Int, sessionId: Int, val localSessionId: Int,
connectionTimeoutSec: Int, isReliable: Boolean) :
MediaDriverConnection(subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
lateinit var subscription: Subscription
lateinit var publication: Publication
}

View File

@ -14,8 +14,10 @@
* limitations under the License.
*/
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.mediaDriver.MediaDriverConnection.Companion.uri
import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientTimedOutException
@ -27,13 +29,19 @@ import java.util.concurrent.*
* For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER
* NOTE: IPC connection will ALWAYS have a timeout of 10 second to connect. This is IPC, it should connect fast
*/
internal open class ClientIpc_MediaDriver(streamId: Int,
val streamIdSubscription: Int,
sessionId: Int,
localSessionId: Int) :
MediaDriverClient(streamIdSubscription, streamId, sessionId, localSessionId, 10, true) {
internal open class ClientIpcDriver(streamId: Int,
sessionId: Int,
localSessionId: Int) :
MediaDriverClient(
port = streamId,
streamId = streamId,
remoteSessionId = sessionId,
localSessionId = localSessionId,
connectionTimeoutSec = 10,
isReliable = true
) {
var success: Boolean = false
override val type = "ipc"
/**
@ -45,10 +53,10 @@ internal open class ClientIpc_MediaDriver(streamId: Int,
fun build(aeronDriver: AeronDriver, logger: KLogger) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri("ipc", sessionId)
val publicationUri = uri("ipc", remoteSessionId)
// Create a subscription at the given address and port, using the given stream ID.
val subscriptionUri = uri("ipc", localSessionId)
val subscriptionUri = uri("ipc", 0)
if (logger.isTraceEnabled) {
logger.trace("IPC client pub URI: ${publicationUri.build()}")
@ -62,14 +70,14 @@ internal open class ClientIpc_MediaDriver(streamId: Int,
// For publications, if we add them "too quickly" (faster than the 'linger' timeout), Aeron will throw exceptions.
// ESPECIALLY if it is with the same streamID
val lingerTimeoutNs = aeronDriver.getLingerNs()
sleep(TimeUnit.NANOSECONDS.toMillis(lingerTimeoutNs))
// this check is in the "reconnect" logic
val publication = aeronDriver.addPublication(publicationUri, streamId)
val subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSubscription)
val subscription = aeronDriver.addSubscription(subscriptionUri, localSessionId)
// always include the linger timeout, so we don't accidentally kill ourself by taking too long
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + lingerTimeoutNs
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + aeronDriver.getLingerNs()
val startTime = System.nanoTime()
while (System.nanoTime() - startTime < timoutInNanos) {
@ -80,7 +88,6 @@ internal open class ClientIpc_MediaDriver(streamId: Int,
sleep(500L)
}
if (!success) {
subscription.close()
publication.close()
@ -97,9 +104,9 @@ internal open class ClientIpc_MediaDriver(streamId: Int,
override val info : String by lazy {
if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC connection established to [$streamIdSubscription|$streamId]"
"[$sessionId] IPC connection established to [$streamId|$subscriptionPort]"
} else {
"Connecting handshake to IPC [$streamIdSubscription|$streamId]"
"Connecting handshake to IPC [$streamId|$subscriptionPort]"
}
}

View File

@ -14,9 +14,10 @@
* limitations under the License.
*/
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import dorkbox.netUtil.IP
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.mediaDriver.MediaDriverConnection.Companion.uriEndpoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientTimedOutException
@ -30,20 +31,16 @@ import java.util.concurrent.*
* For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER.
* A connection timeout of 0, means to wait forever
*/
internal class ClientUdp_MediaDriver(val address: InetAddress,
val publicationPort: Int,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
localSessionId: Int,
connectionTimeoutSec: Int = 0,
isReliable: Boolean) :
MediaDriverClient(subscriptionPort, streamId, sessionId, localSessionId, connectionTimeoutSec, isReliable) {
private val addressString: String by lazy {
IP.toString(address)
}
internal class ClientUdpDriver(val address: InetAddress, val addressString: String,
port: Int,
streamId: Int,
sessionId: Int,
localSessionId: Int,
connectionTimeoutSec: Int = 0,
isReliable: Boolean) :
MediaDriverClient(port, streamId, sessionId, localSessionId, connectionTimeoutSec, isReliable) {
var success: Boolean = false
override val type: String by lazy {
if (address is Inet4Address) {
"IPv4"
@ -52,15 +49,12 @@ internal class ClientUdp_MediaDriver(val address: InetAddress,
}
}
/**
* @throws ClientRetryException if we need to retry to connect
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
@Suppress("DuplicatedCode")
fun build(aeronDriver: AeronDriver, logger: KLogger) {
val aeronAddressString = connectionString(address)
var success = false
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
@ -71,10 +65,10 @@ internal class ClientUdp_MediaDriver(val address: InetAddress,
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uriEndpoint("udp", sessionId, isReliable, "$aeronAddressString:$publicationPort")
val publicationUri = uriEndpoint("udp", remoteSessionId, isReliable, address, addressString, port)
// Create a subscription the given address and port, using the given stream ID.
val subscriptionUri = uriEndpoint("udp", localSessionId, isReliable, "$aeronAddressString:$subscriptionPort")
val subscriptionUri = uriEndpoint("udp", localSessionId, isReliable, address, addressString, 0)
if (logger.isTraceEnabled) {
@ -84,14 +78,13 @@ internal class ClientUdp_MediaDriver(val address: InetAddress,
// For publications, if we add them "too quickly" (faster than the 'linger' timeout), Aeron will throw exceptions.
// ESPECIALLY if it is with the same streamID. This was noticed as a problem with IPC
val lingerTimeoutNs = aeronDriver.getLingerNs()
val publication = aeronDriver.addPublication(publicationUri, streamId)
val subscription = aeronDriver.addSubscription(subscriptionUri, streamId)
// always include the linger timeout, so we don't accidentally kill ourself by taking too long
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + lingerTimeoutNs
val timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) + aeronDriver.getLingerNs()
val startTime = System.nanoTime()
while (System.nanoTime() - startTime < timoutInNanos) {
@ -119,9 +112,9 @@ internal class ClientUdp_MediaDriver(val address: InetAddress,
override val info: String by lazy {
if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"$addressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
"$addressString [$port|$subscriptionPort] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Connecting handshake to $addressString [$subscriptionPort|$publicationPort] [$streamId|*] (reliable:$isReliable)"
"Connecting handshake to $addressString [$port|$subscriptionPort] [$streamId|*] (reliable:$isReliable)"
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("DuplicatedCode")
package dorkbox.network.aeron.mediaDriver
import dorkbox.util.logger
import io.aeron.Publication
import io.aeron.Subscription
abstract class MediaDriverClient(val port: Int,
val streamId: Int,
val remoteSessionId: Int,
val localSessionId: Int,
val connectionTimeoutSec: Int,
val isReliable: Boolean) : MediaDriverConnection {
lateinit var subscription: Subscription
lateinit var publication: Publication
val subscriptionPort: Int by lazy {
if (this is ClientIpcDriver) {
localSessionId
} else {
val addressesAndPorts = subscription.localSocketAddresses()
if (addressesAndPorts.size > 1) {
logger().error { "Subscription ports for client is MORE than 1. This is 'ok', but we only support the use the first one!" }
}
val first = addressesAndPorts.first()
// split
val splitPoint = first.lastIndexOf(':')
val port = first.substring(splitPoint+1)
port.toInt()
}
}
}

View File

@ -1,4 +1,4 @@
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import io.aeron.Publication
import io.aeron.Subscription

View File

@ -15,34 +15,22 @@
*/
@file:Suppress("DuplicatedCode")
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import dorkbox.network.aeron.AeronDriver
import io.aeron.ChannelUriStringBuilder
import java.net.Inet4Address
import java.net.InetAddress
abstract class MediaDriverConnection(val subscriptionPort: Int,
val streamId: Int, val sessionId: Int,
val connectionTimeoutSec: Int, val isReliable: Boolean) {
interface MediaDriverConnection {
var success: Boolean = false
abstract val type: String
val type: String
// We don't use 'suspend' for these, because we have to pump events from a NORMAL thread. If there are any suspend points, there is
// the potential for a live-lock due to coroutine scheduling
val info : String
companion object {
fun connectionString(ipAddress: InetAddress): String {
return if (ipAddress is Inet4Address) {
ipAddress.hostAddress
} else {
// IPv6 requires the address to be bracketed by [...]
val host = ipAddress.hostAddress
if (host[0] == '[') {
host
} else {
"[${ipAddress.hostAddress}]"
}
}
}
fun uri(type: String, sessionId: Int, isReliable: Boolean? = null): ChannelUriStringBuilder {
val builder = ChannelUriStringBuilder().media(type)
if (isReliable != null) {
@ -56,16 +44,22 @@ abstract class MediaDriverConnection(val subscriptionPort: Int,
return builder
}
fun uriEndpoint(type: String, sessionId: Int, isReliable: Boolean, endpoint: String): ChannelUriStringBuilder {
fun uriEndpoint(type: String, sessionId: Int, isReliable: Boolean, address: InetAddress, addressString: String, port: Int): ChannelUriStringBuilder {
val builder = uri(type, sessionId, isReliable)
builder.endpoint(endpoint)
if (address is Inet4Address) {
builder.endpoint("$addressString:$port")
} else {
// IPv6 requires the address to be bracketed by [...]
if (addressString[0] == '[') {
builder.endpoint("$addressString:$port")
} else {
// there MUST be [] surrounding the IPv6 address for aeron to like it!
builder.endpoint("[$addressString]:$port")
}
}
return builder
}
}
// We don't use 'suspend' for these, because we have to pump events from a NORMAL thread. If there are any suspend points, there is
// the potential for a live-lock due to coroutine scheduling
abstract val info : String
}

View File

@ -15,14 +15,15 @@
*/
@file:Suppress("DuplicatedCode")
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import io.aeron.Subscription
abstract class MediaDriverServer(subscriptionPort: Int,
streamId: Int, sessionId: Int,
connectionTimeoutSec: Int, isReliable: Boolean) :
MediaDriverConnection(subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
abstract class MediaDriverServer(val port: Int,
val streamId: Int,
val sessionId: Int,
val connectionTimeoutSec: Int, val
isReliable: Boolean) : MediaDriverConnection {
lateinit var subscription: Subscription
}

View File

@ -14,19 +14,22 @@
* limitations under the License.
*/
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.mediaDriver.MediaDriverConnection.Companion.uri
import mu.KLogger
/**
* For a client, the streamId specified here MUST be manually flipped because they are in the perspective of the SERVER
* NOTE: IPC connection will ALWAYS have a timeout of 10 second to connect. This is IPC, it should connect fast
*/
internal open class ServerIpc_MediaDriver(streamId: Int,
val streamIdSubscription: Int,
sessionId: Int) :
internal open class ServerIpcDriver(streamId: Int,
sessionId: Int) :
MediaDriverServer(0, streamId, sessionId, 10, true) {
var success: Boolean = false
override val type = "ipc"
/**
@ -43,14 +46,14 @@ internal open class ServerIpc_MediaDriver(streamId: Int,
}
success = true
subscription = aeronDriver.addSubscription(subscriptionUri, streamIdSubscription)
subscription = aeronDriver.addSubscription(subscriptionUri, streamId)
}
override val info : String by lazy {
if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"[$sessionId] IPC listening on [$streamIdSubscription] [$sessionId]"
"[$sessionId] IPC listening on [$streamId] [$sessionId]"
} else {
"Listening handshake on IPC [$streamIdSubscription] [$sessionId]"
"Listening handshake on IPC [$streamId] [$sessionId]"
}
}

View File

@ -14,11 +14,13 @@
* limitations under the License.
*/
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import dorkbox.netUtil.IP
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.mediaDriver.MediaDriverConnection.Companion.uriEndpoint
import mu.KLogger
import java.net.Inet4Address
import java.net.InetAddress
@ -27,21 +29,21 @@ import java.net.InetAddress
* For a client, the ports specified here MUST be manually flipped because they are in the perspective of the SERVER.
* A connection timeout of 0, means to wait forever
*/
internal open class ServerUdp_MediaDriver(val listenAddress: InetAddress,
subscriptionPort: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutSec: Int,
isReliable: Boolean) :
MediaDriverServer(subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
internal open class ServerUdpDriver(val listenAddress: InetAddress,
port: Int,
streamId: Int,
sessionId: Int,
connectionTimeoutSec: Int,
isReliable: Boolean) :
MediaDriverServer(port, streamId, sessionId, connectionTimeoutSec, isReliable) {
var success: Boolean = false
override val type = "udp"
fun build(aeronDriver: AeronDriver, logger: KLogger) {
val connectionString = connectionString(listenAddress)
// Create a subscription at the given address and port, using the given stream ID.
val subscriptionUri = uriEndpoint("udp", sessionId, isReliable, "$connectionString:$subscriptionPort")
val subscriptionUri = uriEndpoint("udp", sessionId, isReliable, listenAddress, IP.toString(listenAddress), port)
if (logger.isTraceEnabled) {
if (listenAddress is Inet4Address) {
@ -67,9 +69,9 @@ internal open class ServerUdp_MediaDriver(val listenAddress: InetAddress,
}
if (sessionId != AeronDriver.RESERVED_SESSION_ID_INVALID) {
"Listening on $address [$subscriptionPort] [$streamId|$sessionId] (reliable:$isReliable)"
"Listening on $address [$port] [$streamId|$sessionId] (reliable:$isReliable)"
} else {
"Listening handshake on $address [$subscriptionPort] [$streamId|*] (reliable:$isReliable)"
"Listening handshake on $address [$port] [$streamId|*] (reliable:$isReliable)"
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package dorkbox.network.aeron
package dorkbox.network.aeron.mediaDriver
import io.aeron.Publication
import java.net.InetAddress
@ -35,9 +35,9 @@ internal class UdpMediaDriverPairedConnection(
isReliable: Boolean,
val publication: Publication
) :
ServerUdp_MediaDriver(listenAddress, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
ServerUdpDriver(listenAddress, subscriptionPort, streamId, sessionId, connectionTimeoutSec, isReliable) {
override fun toString(): String {
return "$remoteAddressString [$subscriptionPort|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
return "$remoteAddressString [$port|$publicationPort] [$streamId|$sessionId] (reliable:$isReliable)"
}
}

View File

@ -15,7 +15,7 @@
*/
package dorkbox.network.connection
import dorkbox.network.aeron.MediaDriverConnectInfo
import dorkbox.network.aeron.mediaDriver.MediaDriverConnectInfo
data class ConnectionParams<CONNECTION : Connection>(
val endPoint: EndPoint<CONNECTION>,

View File

@ -188,7 +188,6 @@ internal class CryptoManagement(val logger: KLogger,
// NOTE: ALWAYS CALLED ON THE SAME THREAD! (from the server, mutually exclusive calls to decrypt)
fun encrypt(clientPublicKeyBytes: ByteArray,
publicationPort: Int,
subscriptionPort: Int,
connectionSessionId: Int,
connectionStreamId: Int,
@ -205,7 +204,6 @@ internal class CryptoManagement(val logger: KLogger,
cryptOutput.reset()
cryptOutput.writeInt(connectionSessionId)
cryptOutput.writeInt(connectionStreamId)
cryptOutput.writeInt(publicationPort)
cryptOutput.writeInt(subscriptionPort)
cryptOutput.writeInt(kryoRegDetails.size)
cryptOutput.writeBytes(kryoRegDetails)
@ -230,7 +228,6 @@ internal class CryptoManagement(val logger: KLogger,
val sessionId = cryptInput.readInt()
val streamId = cryptInput.readInt()
val publicationPort = cryptInput.readInt()
val subscriptionPort = cryptInput.readInt()
val regDetailsSize = cryptInput.readInt()
val regDetails = cryptInput.readBytes(regDetailsSize)
@ -238,8 +235,7 @@ internal class CryptoManagement(val logger: KLogger,
// now read data off
return ClientConnectionInfo(sessionId = sessionId,
streamId = streamId,
publicationPort = publicationPort,
subscriptionPort = subscriptionPort,
port = subscriptionPort,
publicKey = serverPublicKeyBytes,
kryoRegistrationDetails = regDetails)
} catch (e: Exception) {

View File

@ -21,7 +21,6 @@ import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.BacklogStat
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.connection.streaming.StreamingControl
import dorkbox.network.connection.streaming.StreamingData
import dorkbox.network.connection.streaming.StreamingManager
@ -137,8 +136,8 @@ internal constructor(val type: Class<*>,
private val handshakeKryo: KryoExtra<CONNECTION>
private val sendIdleStrategy: CoroutineIdleStrategy
private val pollIdleStrategy: CoroutineIdleStrategy
private val sendIdleStrategy: IdleStrategy
private val pollIdleStrategy: IdleStrategy
private val handshakeSendIdleStrategy: IdleStrategy
/**
@ -173,8 +172,8 @@ internal constructor(val type: Class<*>,
// serialization stuff
@Suppress("UNCHECKED_CAST")
serialization = config.serialization as Serialization<CONNECTION>
sendIdleStrategy = config.sendIdleStrategy.clone()
pollIdleStrategy = config.pollIdleStrategy.clone()
sendIdleStrategy = config.sendIdleStrategy.cloneToNormal()
pollIdleStrategy = config.pollIdleStrategy.cloneToNormal()
handshakeSendIdleStrategy = config.sendIdleStrategy.cloneToNormal()
handshakeKryo = serialization.initHandshakeKryo()
@ -192,15 +191,13 @@ internal constructor(val type: Class<*>,
throw e
}
if (type.javaClass == Server::class.java) {
rmiConnectionSupport = if (type.javaClass == Server::class.java) {
// server cannot "get" global RMI objects, only the client can
rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, serialization)
{ _, _, _ ->
RmiManagerConnections(logger, responseManager, listenerManager, serialization) { _, _, _ ->
throw IllegalAccessException("Global RMI access is only possible from a Client connection!")
}
} else {
rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, serialization)
{ connection, objectId, interfaceClass ->
RmiManagerConnections(logger, responseManager, listenerManager, serialization) { connection, objectId, interfaceClass ->
return@RmiManagerConnections rmiGlobalSupport.getGlobalRemoteObject(connection, objectId, interfaceClass)
}
}
@ -381,13 +378,15 @@ internal constructor(val type: Class<*>,
internal fun writeHandshakeMessage(publication: Publication, aeronLogInfo: String, message: HandshakeMessage) {
// The handshake sessionId IS NOT globally unique
logger.trace { "[$aeronLogInfo - ${message.connectKey}] send HS: $message" }
var retryAttempts = 0
try {
val buffer = handshakeKryo.write(message)
val objectSize = buffer.position()
val internalBuffer = buffer.internalBuffer
var timeoutInNanos = 0L
var startTime = 0L
var result: Long
while (true) {
result = publication.offer(internalBuffer, 0, objectSize)
@ -403,8 +402,12 @@ internal constructor(val type: Class<*>,
* don't "loop forever" if a publication is ACTUALLY closed, like on purpose.
*/
if (result == Publication.NOT_CONNECTED) {
if (retryAttempts++ < sendDataRetryMax) {
if (timeoutInNanos == 0L) {
timeoutInNanos = (aeronDriver.getLingerNs() * 1.2).toLong() // close enough. Just needs to be slightly longer
startTime = System.nanoTime()
}
if (System.nanoTime() - startTime < timeoutInNanos) {
// NOTE: Handlers are called on the client conductor thread. The client conductor thread expects handlers to do safe
// publication of any state to other threads and not be long running or re-entrant with the client.
// on close, the publication CAN linger (in case a client goes away, and then comes back)
@ -415,14 +418,20 @@ internal constructor(val type: Class<*>,
// we should retry.
handshakeSendIdleStrategy.idle()
continue
} else {
} else if (!publication.isClosed) {
// more critical error sending the message. we shouldn't retry or anything.
// this exception will be a ClientException or a ServerException
val exception = newException("[$aeronLogInfo] Error sending message. (Retry count more than ${retryAttempts-1} ${errorCodeName(result)})")
val exception = newException(
"[$aeronLogInfo] Error sending message. (Connection in non-connected state longer than linger timeout ${errorCodeName(result)})"
)
ListenerManager.cleanStackTraceInternal(exception)
listenerManager.notifyError(exception)
throw exception
}
else {
// publication was actually closed, so no bother throwing an error
return
}
}
/**
@ -505,7 +514,7 @@ internal constructor(val type: Class<*>,
try {
// NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change!
val message = serialization.readMessage(buffer, offset, length, connection)
logger.error { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" }
logger.trace { "[${header.sessionId()}] received: ${message?.javaClass?.simpleName} $message" }
// the REPEATED usage of wrapping methods below is because Streaming messages have to intercept date BEFORE it goes to a coroutine
@ -599,7 +608,7 @@ internal constructor(val type: Class<*>,
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
@Suppress("DuplicatedCode", "UNCHECKED_CAST")
internal suspend fun send(message: Any, publication: Publication, connection: Connection): Boolean {
internal fun send(message: Any, publication: Publication, connection: Connection): Boolean {
// The handshake sessionId IS NOT globally unique
logger.trace {
"[${publication.sessionId()}] send: ${message.javaClass.simpleName} : $message"
@ -658,11 +667,8 @@ internal constructor(val type: Class<*>,
var startTime = 0L
var result: Long
var retryAttempts = 0
while (true) {
result = publication.offer(internalBuffer, offset, objectSize)
logger.error { "SEND DATA: $result" }
if (result >= 0) {
// success!
return true
@ -675,13 +681,18 @@ internal constructor(val type: Class<*>,
* don't "loop forever" if a publication is ACTUALLY closed, like on purpose.
*/
if (result == Publication.NOT_CONNECTED) {
if (retryAttempts++ < sendDataRetryMax) {
if (timeoutInNanos == 0L) {
timeoutInNanos = (aeronDriver.getLingerNs() * 1.2).toLong() // close enough. Just needs to be slightly longer
startTime = System.nanoTime()
}
if (System.nanoTime() - startTime < timeoutInNanos) {
// we should retry.
sendIdleStrategy.idle()
continue
} else {
} else if (!publication.isClosed) {
// more critical error sending the message. we shouldn't retry or anything.
val errorMessage = "[${publication.sessionId()}] Error sending message. (Retry count more than ${retryAttempts-1} ${errorCodeName(result)})"
val errorMessage = "[${publication.sessionId()}] Error sending message. (Connection in non-connected state longer than linger timeout. ${errorCodeName(result)})"
// either client or server. No other choices. We create an exception, because it's more useful!
val exception = newException(errorMessage)
@ -691,6 +702,9 @@ internal constructor(val type: Class<*>,
// where we see who is calling "send()"
ListenerManager.cleanStackTrace(exception, 5)
return false
} else {
// publication was actually closed, so no bother throwing an error
return false
}
}
@ -711,6 +725,7 @@ internal constructor(val type: Class<*>,
continue
}
if (result == Publication.CLOSED && connection.isClosedViaAeron()) {
// this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
@ -796,32 +811,32 @@ internal constructor(val type: Class<*>,
if (shutdown.compareAndSet(expect = false, update = true)) {
logger.info { "Shutting down..." }
runBlocking {
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
val enableRemove = type == Client::class.java
connections.forEach {
logger.info { "[${it.id}/${it.streamId}] Closing connection" }
it.close(enableRemove, true)
}
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
val enableRemove = type == Client::class.java
connections.forEach {
logger.info { "[${it.id}/${it.streamId}] Closing connection" }
it.close(enableRemove, true)
}
runBlocking {
// Connections are closed first, because we want to make sure that no RMI messages can be received
// when we close the RMI support objects (in which case, weird - but harmless - errors show up)
// this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via and RMI method)
responseManager.close()
// the storage is closed via this as well.
storage.close()
close0()
aeronDriver.close()
// if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now)
shutdownLatch.countDown()
}
// the storage is closed via this as well.
storage.close()
close0()
aeronDriver.close()
// if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now)
shutdownLatch.countDown()
logger.info { "Done shutting down..." }
}
}

View File

@ -204,7 +204,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
}
}
private suspend fun sendFailMessageAndThrow(
private fun sendFailMessageAndThrow(
e: Exception,
streamSessionId: Long,
publication: Publication,
@ -244,7 +244,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
* @param internalBuffer this is the ORIGINAL object data that is to be "chunked" and sent across the wire
* @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
*/
suspend fun send(
fun send(
publication: Publication,
internalBuffer: MutableDirectBuffer,
objectSize: Int,

View File

@ -15,8 +15,7 @@
*/
package dorkbox.network.handshake
internal class ClientConnectionInfo(val subscriptionPort: Int = 0,
val publicationPort: Int = 0,
internal class ClientConnectionInfo(val port: Int = 0,
val sessionId: Int,
val streamId: Int = 0,
val publicKey: ByteArray = ByteArray(0),

View File

@ -16,7 +16,7 @@
package dorkbox.network.handshake
import dorkbox.network.Client
import dorkbox.network.aeron.MediaDriverClient
import dorkbox.network.aeron.mediaDriver.MediaDriverClient
import dorkbox.network.connection.Connection
import dorkbox.network.connection.CryptoManagement
import dorkbox.network.connection.ListenerManager
@ -115,22 +115,20 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
HandshakeMessage.HELLO_ACK_IPC -> {
// The message was intended for this client. Try to parse it as one of the available message types.
// this message is ENCRYPTED!
// this message is NOT-ENCRYPTED!
val cryptInput = crypto.cryptInput
if (registrationData != null) {
cryptInput.buffer = registrationData
val sessId = cryptInput.readInt()
val streamSubId = cryptInput.readInt()
val streamPubId = cryptInput.readInt()
val regDetailsSize = cryptInput.readInt()
val regDetails = cryptInput.readBytes(regDetailsSize)
// now read data off
connectionHelloInfo = ClientConnectionInfo(sessionId = sessId,
subscriptionPort = streamSubId,
publicationPort = streamPubId,
port = streamPubId,
kryoRegistrationDetails = regDetails)
} else {
failedException = ClientRejectedException("[$aeronLogInfo - ${message.connectKey}] canceled handshake for message without registration data")
@ -165,7 +163,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
connectKey = getSafeConnectKey()
val publicKey = endPoint.storage.getPublicKey()!!
val aeronLogInfo = "${handshakeConnection.sessionId}/${handshakeConnection.streamId}"
val aeronLogInfo = "${handshakeConnection.remoteSessionId}/${handshakeConnection.streamId}"
// Send the one-time pad to the server.
val publication = handshakeConnection.publication
@ -230,10 +228,10 @@ internal class ClientHandshake<CONNECTION: Connection>(
// called from the connect thread
fun done(handshakeConnection: MediaDriverClient, connectionTimeoutSec: Int) {
val registrationMessage = HandshakeMessage.doneFromClient(connectKey,
handshakeConnection.subscriptionPort,
handshakeConnection.subscription.streamId())
handshakeConnection.subscriptionPort,
handshakeConnection.subscription.streamId())
val aeronLogInfo = "${handshakeConnection.sessionId}/${handshakeConnection.streamId}"
val aeronLogInfo = "${handshakeConnection.remoteSessionId}/${handshakeConnection.streamId}"
// Send the done message to the server.
try {

View File

@ -15,14 +15,13 @@
*/
package dorkbox.network.handshake
import dorkbox.netUtil.IP
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.MediaDriverConnectInfo
import dorkbox.network.aeron.MediaDriverConnection
import dorkbox.network.aeron.ServerIpc_MediaDriver
import dorkbox.network.aeron.UdpMediaDriverPairedConnection
import dorkbox.network.aeron.mediaDriver.MediaDriverConnectInfo
import dorkbox.network.aeron.mediaDriver.MediaDriverConnection
import dorkbox.network.aeron.mediaDriver.ServerIpcDriver
import dorkbox.network.aeron.mediaDriver.UdpMediaDriverPairedConnection
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.ListenerManager
@ -313,9 +312,8 @@ internal class ServerHandshake<CONNECTION : Connection>(
// create a new connection. The session ID is encrypted.
try {
// Create a subscription at the given address and port, using the given stream ID.
val driver = ServerIpc_MediaDriver(streamId = connectionStreamPubId,
streamIdSubscription = connectionStreamSubId,
sessionId = connectionSessionId)
val driver = ServerIpcDriver(streamId = connectionStreamSubId,
sessionId = connectionSessionId)
driver.build(aeronDriver, logger)
// create a new publication for the connection (since the handshake ALWAYS closes the current publication)
@ -325,8 +323,8 @@ internal class ServerHandshake<CONNECTION : Connection>(
val clientConnection = MediaDriverConnectInfo(
publication = clientPublication,
subscription = driver.subscription,
subscriptionPort = driver.streamIdSubscription,
publicationPort = driver.streamId,
subscriptionPort = driver.streamId,
publicationPort = message.subscriptionPort,
streamId = 0, // this is because with IPC, we have stream sub/pub (which are replaced as port sub/pub)
sessionId = driver.sessionId,
isReliable = driver.isReliable,
@ -360,7 +358,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
cryptOutput.reset()
cryptOutput.writeInt(connectionSessionId)
cryptOutput.writeInt(connectionStreamSubId)
cryptOutput.writeInt(connectionStreamPubId)
val regDetails = serialization.getKryoRegistrationDetails()
cryptOutput.writeInt(regDetails.size)
@ -391,7 +388,8 @@ internal class ServerHandshake<CONNECTION : Connection>(
fun processUdpHandshakeMessageServer(
server: Server<CONNECTION>,
handshakePublication: Publication,
remoteIpAndPort: String,
clientAddress: InetAddress,
clientAddressString: String,
isReliable: Boolean,
message: HandshakeMessage,
aeronDriver: AeronDriver,
@ -400,20 +398,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
logger: KLogger
) {
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
// val port = remoteIpAndPort.substring(splitPoint+1)
// this should never be null, because we are feeding it a valid IP address from aeron
val clientAddress = IP.toAddress(clientAddressString)
if (clientAddress == null) {
logger.error { "[$aeronLogInfo] Connection from $clientAddressString not allowed! Invalid IP address!" }
return
}
// Manage the Handshake state
if (!validateMessageTypeAndDoPending(
server, server.actionDispatch, handshakePublication, message,
@ -422,8 +406,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
return
}
val clientPublicKeyBytes = message.publicKey
val validateRemoteAddress: PublicKeyValidationState
val serialization = config.serialization
@ -491,7 +473,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
// the pub/sub do not necessarily have to be the same. They can be ANY port
val publicationPort = message.subscriptionPort
val subscriptionPort = config.subscriptionPort
val subscriptionPort = config.port
// create a new connection. The session ID is encrypted.
@ -507,7 +489,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
}
// create a new publication for the connection (since the handshake ALWAYS closes the current publication)
val publicationUri = MediaDriverConnection.uriEndpoint("udp", message.sessionId, isReliable, "$clientAddressString:${message.subscriptionPort}")
val publicationUri = MediaDriverConnection.uriEndpoint("udp", message.sessionId, isReliable, clientAddress, clientAddressString, message.subscriptionPort)
val clientPublication = aeronDriver.addPublication(publicationUri, message.streamId)
val driver = UdpMediaDriverPairedConnection(
@ -529,7 +511,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
val clientConnection = MediaDriverConnectInfo(
publication = driver.publication,
subscription = driver.subscription,
subscriptionPort = driver.subscriptionPort,
subscriptionPort = driver.port,
publicationPort = publicationPort,
streamId = driver.streamId,
sessionId = driver.sessionId,
@ -574,7 +556,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
// now create the encrypted payload, using ECDH
successMessage.registrationData = server.crypto.encrypt(clientPublicKeyBytes!!,
publicationPort,
subscriptionPort,
connectionSessionId,
connectionStreamId,

View File

@ -2,13 +2,14 @@
package dorkbox.network.handshake
import dorkbox.netUtil.IP
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.MediaDriverConnection
import dorkbox.network.aeron.ServerIpc_MediaDriver
import dorkbox.network.aeron.ServerUdp_MediaDriver
import dorkbox.network.aeron.mediaDriver.MediaDriverConnection
import dorkbox.network.aeron.mediaDriver.ServerIpcDriver
import dorkbox.network.aeron.mediaDriver.ServerUdpDriver
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import io.aeron.FragmentAssembler
@ -87,12 +88,19 @@ internal object ServerHandshakePollers {
if (message !is HandshakeMessage) {
logger.error { "[$aeronLogInfo] Connection from $clientAddressString not allowed! Invalid connection request" }
} else {
// this should never be null, because we are feeding it a valid IP address from aeron
val clientAddress = IP.toAddress(clientAddressString)
if (clientAddress == null) {
logger.error { "[$aeronLogInfo] Connection from $clientAddressString not allowed! Invalid IP address!" }
return
}
// we create a NEW publication for the handshake, which connects directly to the client handshake subscription
val publicationUri = MediaDriverConnection.uriEndpoint("udp", message.sessionId, isReliable, "$clientAddressString:${message.subscriptionPort}")
val publicationUri = MediaDriverConnection.uriEndpoint("udp", message.sessionId, isReliable, clientAddress, clientAddressString, message.subscriptionPort)
val publication = aeronDriver.addPublication(publicationUri, message.streamId)
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, isReliable, message,
server, publication, clientAddress, clientAddressString, isReliable, message,
aeronDriver, aeronLogInfo, isIpv6Wildcard,
connectionFunc, logger
)
@ -112,9 +120,8 @@ internal object ServerHandshakePollers {
val connectionFunc = server.connectionFunc
val poller = if (config.enableIpc) {
val driver = ServerIpc_MediaDriver(
streamIdSubscription = config.ipcSubscriptionId,
streamId = config.ipcPublicationId,
val driver = ServerIpcDriver(
streamId = config.ipcId,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID
)
driver.build(aeronDriver, logger)
@ -155,9 +162,9 @@ internal object ServerHandshakePollers {
val isReliable = config.isReliable
val poller = if (server.canUseIPv4) {
val driver = ServerUdp_MediaDriver(
val driver = ServerUdpDriver(
listenAddress = server.listenIPv4Address!!,
subscriptionPort = config.subscriptionPort,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds,
@ -209,9 +216,9 @@ internal object ServerHandshakePollers {
val isReliable = config.isReliable
val poller = if (server.canUseIPv6) {
val driver = ServerUdp_MediaDriver(
val driver = ServerUdpDriver(
listenAddress = server.listenIPv6Address!!,
subscriptionPort = config.subscriptionPort,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds,
@ -261,9 +268,9 @@ internal object ServerHandshakePollers {
val connectionFunc = server.connectionFunc
val isReliable = config.isReliable
val driver = ServerUdp_MediaDriver(
val driver = ServerUdpDriver(
listenAddress = server.listenIPv6Address!!,
subscriptionPort = config.subscriptionPort,
port = config.port,
streamId = AeronDriver.UDP_HANDSHAKE_STREAM_ID,
sessionId = AeronDriver.RESERVED_SESSION_ID_INVALID,
connectionTimeoutSec = config.connectionCloseTimeoutInSeconds,

View File

@ -108,11 +108,13 @@ object AeronClient {
fun main(args: Array<String>) {
val configuration = ClientConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.subscriptionPort = 4000
configuration.publicationPort = 2000
// configuration.enableIpc = true
configuration.enableIpc = false
configuration.uniqueAeronDirectory = true
configuration.port = 2000
configuration.enableIpc = true
// configuration.enableIpc = false
// configuration.enableIPv4 = false
// configuration.enableIPv6 = true
// configuration.uniqueAeronDirectory = true
val client = Client<Connection>(configuration)

View File

@ -89,10 +89,15 @@ object AeronServer {
val configuration = ServerConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.listenIpAddress = "127.0.0.1"
configuration.subscriptionPort = 2000
configuration.maxClientCount = 5
configuration.port = 2000
configuration.maxClientCount = 50
configuration.enableIpc = true
configuration.maxConnectionsPerIpAddress = 5
// configuration.enableIpc = false
configuration.enableIPv4 = false
configuration.enableIPv6 = false
configuration.maxConnectionsPerIpAddress = 50
val server: Server<*> = Server<Connection>(configuration)

View File

@ -36,7 +36,10 @@ package dorkboxTest.network
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.encoder.PatternLayoutEncoder
import ch.qos.logback.classic.joran.JoranConfigurator
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.ConsoleAppender
import dorkbox.network.Client
import dorkbox.network.ClientConfiguration
import dorkbox.network.Configuration
@ -93,9 +96,9 @@ abstract class BaseTest {
// }
// }
// setLogLevel(Level.TRACE)
setLogLevel(Level.TRACE)
// setLogLevel(Level.ERROR)
setLogLevel(Level.DEBUG)
// setLogLevel(Level.DEBUG)
// we want our entropy generation to be simple (ie, no user interaction to generate)
try {
@ -109,8 +112,7 @@ abstract class BaseTest {
val configuration = ClientConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.subscriptionPort = 2201
configuration.publicationPort = 2200
configuration.port = 2200
configuration.enableIpc = false
@ -121,12 +123,12 @@ abstract class BaseTest {
fun serverConfig(block: ServerConfiguration.() -> Unit = {}): ServerConfiguration {
val configuration = ServerConfiguration()
configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk!
configuration.subscriptionPort = 2200
configuration.port = 2200
configuration.enableIpc = false
configuration.maxClientCount = 5
configuration.maxConnectionsPerIpAddress = 5
configuration.maxClientCount = 50
configuration.maxConnectionsPerIpAddress = 50
block(configuration)
return configuration
@ -141,30 +143,31 @@ abstract class BaseTest {
rootLogger.level = level
val context = rootLogger.loggerContext
context.reset() // override default configuration
val jc = JoranConfigurator()
jc.context = context
jc.doConfigure(File("logback.xml").absoluteFile)
context.reset() // override default configuration
// we only want error messages
val nettyLogger = LoggerFactory.getLogger("io.netty") as Logger
nettyLogger.level = Level.ERROR
// we only want error messages
val kryoLogger = LoggerFactory.getLogger("com.esotericsoftware") as Logger
kryoLogger.level = Level.ERROR
// val encoder = PatternLayoutEncoder()
// encoder.context = context
// encoder.pattern = "%date{HH:mm:ss.SSS} %-5level [%logger{35}] %msg%n"
// encoder.start()
//
// val consoleAppender = ConsoleAppender<ILoggingEvent>()
// consoleAppender.context = context
// consoleAppender.encoder = encoder
// consoleAppender.start()
//
// rootLogger.addAppender(consoleAppender)
// context.getLogger(Server::class.simpleName).trace("TESTING")
// context.getLogger(Client::class.simpleName).trace("TESTING")
val encoder = PatternLayoutEncoder()
encoder.context = context
encoder.pattern = "%date{HH:mm:ss.SSS} %-5level [%logger{35}] %msg%n"
encoder.start()
val consoleAppender = ConsoleAppender<ILoggingEvent>()
consoleAppender.context = context
consoleAppender.encoder = encoder
consoleAppender.start()
rootLogger.addAppender(consoleAppender)
}
}

View File

@ -61,7 +61,7 @@ class DisconnectReconnectTest : BaseTest() {
}
waitForThreads(0)
waitForThreads()
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
@ -70,7 +70,7 @@ class DisconnectReconnectTest : BaseTest() {
@Test
fun reconnectClientViaClientClose() {
run {
val configuration = serverConfig() {
val configuration = serverConfig {
uniqueAeronDirectory = true
}
@ -117,7 +117,7 @@ class DisconnectReconnectTest : BaseTest() {
}
waitForThreads(0)
waitForThreads()
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
@ -133,6 +133,7 @@ class DisconnectReconnectTest : BaseTest() {
}
suspend fun close(connection: Connection) {
connection.logger.error { "PRE CLOSE MESSAGE!" }
connection.close()
}
}
@ -169,7 +170,7 @@ class DisconnectReconnectTest : BaseTest() {
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onConnect {
client.onInit {
rmi.save(CloseImpl(), CLOSE_ID)
}
@ -195,7 +196,7 @@ class DisconnectReconnectTest : BaseTest() {
}
waitForThreads(0)
waitForThreads(AUTO_FAIL_TIMEOUT*10)
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
@ -312,7 +313,7 @@ class DisconnectReconnectTest : BaseTest() {
}
waitForThreads(0)
waitForThreads()
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)

View File

@ -80,6 +80,7 @@ class ListenerTest : BaseTest() {
// standard listener
server.onMessage<String> { message ->
logger.error ("server string message")
// should be called
check()
send(message)
@ -89,23 +90,28 @@ class ListenerTest : BaseTest() {
server.onMessage<Any> {
// should be called!
serverOnMessage.value = true
logger.error ("server any message")
}
// standard connect check
server.onConnect {
logger.error ("server connect")
serverConnect.value = true
onMessage<Any> {
logger.error ("server connection any message")
serverConnectionOnMessage.getAndIncrement()
}
onDisconnect {
logger.error ("server connection disconnect")
serverDisconnectMessage.getAndIncrement()
}
}
// standard listener disconnect check
server.onDisconnect {
logger.error ("server disconnect")
serverDisconnect.value = true
}
@ -122,10 +128,19 @@ class ListenerTest : BaseTest() {
client.onConnect {
logger.error { "client connect 1" }
send(origString) // 20 a's
}
// standard connect check
client.onConnect {
logger.error { "client connect 2" }
clientConnect.value = true
}
client.onMessage<String> { message ->
logger.error { "client string message" }
if (origString != message) {
checkFail2.value = true
System.err.println("original string not equal to the string received")
@ -140,21 +155,16 @@ class ListenerTest : BaseTest() {
}
}
// standard connect check
client.onConnect {
clientConnect.value = true
}
// standard listener disconnect check
client.onDisconnect {
logger.error ("client disconnect")
clientDisconnect.value = true
}
client.connect(LOCALHOST)
waitForThreads(0)
waitForThreads()
// +1 BECAUSE we are `getAndIncrement` for each check earlier
val limitCheck = limit+1

View File

@ -15,7 +15,7 @@ import java.text.SimpleDateFormat
import java.util.*
class MultiClientTest : BaseTest() {
private val totalCount = 10
private val totalCount = 2
private val clientConnectCount = atomic(0)
private val serverConnectCount = atomic(0)
private val disconnectCount = atomic(0)
@ -31,7 +31,6 @@ class MultiClientTest : BaseTest() {
val config = clientConfig()
config.enableIPv6 = false
config.uniqueAeronDirectory = true
config.subscriptionPort += i
val client: Client<Connection> = Client(config, "Client$i")
client.onConnect {
@ -89,7 +88,7 @@ class MultiClientTest : BaseTest() {
}
}
waitForThreads(0) {
waitForThreads() {
outputStats(server)
}

View File

@ -63,7 +63,7 @@ class MultipleServerTest : BaseTest() {
val offset = count * portOffset
val configuration = serverConfig()
configuration.subscriptionPort += offset
configuration.port += offset
configuration.aeronDirectory = serverAeronDir
configuration.enableIpc = false
@ -95,8 +95,7 @@ class MultipleServerTest : BaseTest() {
val offset = count * portOffset
val configuration = clientConfig()
configuration.subscriptionPort += offset
configuration.publicationPort += offset
configuration.port += offset
configuration.aeronDirectory = clientAeronDir
configuration.enableIpc = false
@ -138,7 +137,7 @@ class MultipleServerTest : BaseTest() {
val offset = count * portOffset
val configuration = serverConfig()
configuration.subscriptionPort += offset
configuration.port += offset
configuration.aeronDirectory = serverAeronDir
configuration.enableIpc = true
@ -170,8 +169,7 @@ class MultipleServerTest : BaseTest() {
val offset = count * portOffset
val configuration = clientConfig()
configuration.subscriptionPort += offset
configuration.publicationPort += offset
configuration.port += offset
configuration.aeronDirectory = clientAeronDir
configuration.enableIpc = true

View File

@ -52,6 +52,6 @@ class StreamingTest : BaseTest() {
client.connect(LOCALHOST)
}
waitForThreads(0)
waitForThreads()
}
}

View File

@ -366,7 +366,7 @@ class RmiNestedTest : BaseTest() {
private class TestObjectAnnotImpl : TestObject {
@Transient
private val ID = idCounter.getAndIncrement()
private val id = idCounter.getAndIncrement()
private val otherObject: OtherObject = OtherObjectImpl()
@ -402,7 +402,7 @@ class RmiNestedTest : BaseTest() {
}
override fun hashCode(): Int {
return ID
return id
}
}
@ -421,5 +421,17 @@ class RmiNestedTest : BaseTest() {
override fun hashCode(): Int {
return ID
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as OtherObjectImpl
if (ID != other.ID) return false
if (aFloat != other.aFloat) return false
return true
}
}
}