code cleanup

This commit is contained in:
Robinson 2023-06-25 12:21:21 +02:00
parent 4b511b2615
commit 88f082097a
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 190 additions and 326 deletions

View File

@ -29,26 +29,27 @@ import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.EventDispatcher import dorkbox.network.connection.EventDispatcher
import dorkbox.network.connection.EventDispatcher.Companion.EVENT
import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
import dorkbox.network.connection.PublicKeyValidationState import dorkbox.network.connection.PublicKeyValidationState
import dorkbox.network.exceptions.ClientException import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientHandshakeException
import dorkbox.network.exceptions.ClientRejectedException import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientRetryException import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientShutdownException import dorkbox.network.exceptions.ClientShutdownException
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.ServerException
import dorkbox.network.exceptions.TransmitException
import dorkbox.network.handshake.ClientHandshake import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.ping.Ping import dorkbox.network.ping.Ping
import kotlinx.atomicfu.atomic import dorkbox.util.sync.CountDownLatch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import mu.KotlinLogging import mu.KotlinLogging
import java.net.Inet4Address import java.net.Inet4Address
import java.net.Inet6Address import java.net.Inet6Address
import java.net.InetAddress import java.net.InetAddress
import java.util.concurrent.TimeUnit import java.util.concurrent.*
/** /**
* The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
@ -143,9 +144,7 @@ open class Client<CONNECTION : Connection>(
val timeout = TimeUnit.SECONDS.toMillis(configuration.connectionCloseTimeoutInSeconds.toLong() * 2) val timeout = TimeUnit.SECONDS.toMillis(configuration.connectionCloseTimeoutInSeconds.toLong() * 2)
val logger = KotlinLogging.logger(Client::class.java.simpleName) val logger = KotlinLogging.logger(Client::class.java.simpleName)
AeronDriver(configuration, logger).use { AeronDriver.ensureStopped(configuration, logger, timeout)
it.ensureStopped(timeout, 500)
}
} }
/** /**
@ -157,9 +156,7 @@ open class Client<CONNECTION : Connection>(
*/ */
fun isRunning(configuration: Configuration): Boolean = runBlocking { fun isRunning(configuration: Configuration): Boolean = runBlocking {
val logger = KotlinLogging.logger(Client::class.java.simpleName) val logger = KotlinLogging.logger(Client::class.java.simpleName)
AeronDriver(configuration, logger).use { AeronDriver.isRunning(configuration, logger)
it.isRunning()
}
} }
init { init {
@ -168,11 +165,6 @@ open class Client<CONNECTION : Connection>(
} }
} }
/**
* The UUID is a unique, in-memory instance that is created on object construction
*/
val uuid = RandomBasedGenerator(CryptoManagement.secureRandom).generate()
/** /**
* The network or IPC address for the client to connect to. * The network or IPC address for the client to connect to.
* *
@ -199,18 +191,10 @@ open class Client<CONNECTION : Connection>(
@Volatile @Volatile
private var slowDownForException = false private var slowDownForException = false
@Volatile
private var isConnected = false
// is valid when there is a connection to the server, otherwise it is null // is valid when there is a connection to the server, otherwise it is null
@Volatile
private var connection0: CONNECTION? = null private var connection0: CONNECTION? = null
// This is set by the client so if there is a "connect()" call in the disconnect callback, we can have proper
// lock-stop ordering for how disconnect and connect work with each-other
// GUARANTEE that the callbacks for a NEW connect happen AFTER the previous 'onDisconnect' is finished.
// a CDL is used because it doesn't matter the order in which it's called (as it will always ensure it's correct)
private val disconnectInProgress = atomic<dorkbox.util.sync.CountDownLatch?>(null)
final override fun newException(message: String, cause: Throwable?): Throwable { final override fun newException(message: String, cause: Throwable?): Throwable {
return ClientException(message, cause) return ClientException(message, cause)
} }
@ -259,23 +243,7 @@ open class Client<CONNECTION : Connection>(
reliable = reliable) reliable = reliable)
} }
/** // TODO:: the port should be part of the connect function!
* Will attempt to connect to the server via IPC, with a default 30 second connection timeout and will block until completed.
*
* @param connectionTimeoutSec wait for x seconds. 0 will wait indefinitely.
*
* @throws IllegalArgumentException if the remote address is invalid
* @throws ClientTimedOutException if the client is unable to connect in x amount of time
* @throws ClientRejectedException if the client connection is rejected
*/
@Suppress("DuplicatedCode")
fun connectIpc(connectionTimeoutSec: Int = 30) = runBlocking {
connect(remoteAddress = null, // required!
remoteAddressString = IPC_NAME,
remoteAddressPrettyString = IPC_NAME,
connectionTimeoutSec = connectionTimeoutSec)
}
/** /**
* Will attempt to connect to the server, with a default 30 second connection timeout and will block until completed. * Will attempt to connect to the server, with a default 30 second connection timeout and will block until completed.
* *
@ -335,16 +303,19 @@ open class Client<CONNECTION : Connection>(
when { when {
// this is default IPC settings // this is default IPC settings
remoteAddress.isEmpty() && config.enableIpc -> { remoteAddress.isEmpty() && config.enableIpc -> runBlocking {
connectIpc(connectionTimeoutSec = connectionTimeoutSec) connect(remoteAddress = null, // required!
remoteAddressString = IPC_NAME,
remoteAddressPrettyString = IPC_NAME,
connectionTimeoutSec = connectionTimeoutSec)
} }
// IPv6 takes precedence ONLY if it's enabled manually // IPv6 takes precedence ONLY if it's enabled manually
config.enableIPv6 -> connect(ResolvedAddressTypes.IPV6_ONLY) config.enableIPv6 -> { connect(ResolvedAddressTypes.IPV6_ONLY) }
config.enableIPv4 -> connect(ResolvedAddressTypes.IPV4_ONLY) config.enableIPv4 -> { connect(ResolvedAddressTypes.IPV4_ONLY) }
IPv4.isPreferred -> connect(ResolvedAddressTypes.IPV4_PREFERRED) IPv4.isPreferred -> { connect(ResolvedAddressTypes.IPV4_PREFERRED) }
IPv6.isPreferred -> connect(ResolvedAddressTypes.IPV6_PREFERRED) IPv6.isPreferred -> { connect(ResolvedAddressTypes.IPV6_PREFERRED) }
else -> connect(ResolvedAddressTypes.IPV4_PREFERRED) else -> { connect(ResolvedAddressTypes.IPV4_PREFERRED) }
} }
} }
@ -384,16 +355,13 @@ open class Client<CONNECTION : Connection>(
connectionTimeoutSec: Int = 30, connectionTimeoutSec: Int = 30,
reliable: Boolean = true) reliable: Boolean = true)
{ {
// on the client, we must GUARANTEE that the disconnect completes before NEW connect begins. // NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines!
if (!disconnectInProgress.compareAndSet(null, dorkbox.util.sync.CountDownLatch(1))) {
val disconnectCDL = disconnectInProgress.value!!
logger.debug { "Redispatching connect request!" }
EventDispatcher.launch(EVENT.CONNECT) {
logger.debug { "Redispatch connect request started!" }
disconnectCDL.await(config.connectionCloseTimeoutInSeconds.toLong(), TimeUnit.SECONDS)
// on the client, we must GUARANTEE that the disconnect/close completes before NEW connect begins.
// we will know this if we are running inside an INTERNAL dispatch that is NOT the connect dispatcher!
val currentDispatcher = EventDispatcher.getCurrentEvent()
if (currentDispatcher != null && currentDispatcher != EventDispatcher.CONNECT) {
EventDispatcher.CONNECT.launch {
connect(remoteAddress, connect(remoteAddress,
remoteAddressString, remoteAddressString,
remoteAddressPrettyString, remoteAddressPrettyString,
@ -403,16 +371,20 @@ open class Client<CONNECTION : Connection>(
return return
} }
// NOTE: it is critical to remember that Aeron DOES NOT like running from coroutines! // the lifecycle of a client is the ENDPOINT (measured via the network event poller) and CONNECTION (measure from connection closed)
if (!waitForClose()) {
if (endpointIsRunning.value) {
listenerManager.notifyError(ServerException("Unable to start, the client is already running!"))
} else {
listenerManager.notifyError(ClientException("Unable to connect the client!"))
}
return
}
config as ClientConfiguration config as ClientConfiguration
require(connectionTimeoutSec >= 0) { "connectionTimeoutSec '$connectionTimeoutSec' is invalid. It must be >=0" } require(connectionTimeoutSec >= 0) { "connectionTimeoutSec '$connectionTimeoutSec' is invalid. It must be >=0" }
if (isConnected) {
logger.error { "Unable to connect when already connected!" }
return
}
connection0 = null connection0 = null
// localhost/loopback IP might not always be 127.0.0.1 or ::1 // localhost/loopback IP might not always be 127.0.0.1 or ::1
@ -438,15 +410,20 @@ open class Client<CONNECTION : Connection>(
require(false) { "Cannot connect to $remoteAddressPrettyString It is an invalid address!" } require(false) { "Cannot connect to $remoteAddressPrettyString It is an invalid address!" }
} }
// if there are client crashes, we want to be able to still call connect()
// local scope ONLY until the connection is actually made - because if there are errors we throw this one away
val connectLatch = CountDownLatch(1)
// we are done with initial configuration, now initialize aeron and the general state of this endpoint // we are done with initial configuration, now initialize aeron and the general state of this endpoint
// this also makes sure that the dispatchers are still active. // this also makes sure that the dispatchers are still active.
// Calling `client.close()` will shutdown the dispatchers (and a new client instance must be created) // Calling `client.close()` will shutdown the dispatchers (and a new client instance must be created)
try { try {
startDriver() startDriver()
verifyState() verifyState()
initializeLatch() initializeState()
} catch (e: Exception) { } catch (e: Exception) {
logger.error(e) { "Unable to start the endpoint!" } resetOnError()
listenerManager.notifyError(ClientException("Unable to start the client!", e))
return return
} }
@ -464,21 +441,25 @@ open class Client<CONNECTION : Connection>(
// how long does the initial handshake take to connect // how long does the initial handshake take to connect
var handshakeTimeoutSec = 5 var handshakeTimeoutSec = 5
// how long before we COMPLETELY give up retrying // how long before we COMPLETELY give up retrying
var timoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong()) var connectionTimoutInNanos = TimeUnit.SECONDS.toNanos(connectionTimeoutSec.toLong())
var connectionCloseTimeoutInSeconds = config.connectionCloseTimeoutInSeconds.toLong()
if (DEBUG_CONNECTIONS) { if (DEBUG_CONNECTIONS) {
// connections are extremely difficult to diagnose when the connection timeout is short // connections are extremely difficult to diagnose when the connection timeout is short
timoutInNanos += TimeUnit.HOURS.toSeconds(1).toInt() connectionTimoutInNanos = TimeUnit.HOURS.toNanos(1).toLong()
handshakeTimeoutSec += TimeUnit.HOURS.toSeconds(1).toInt() handshakeTimeoutSec = TimeUnit.HOURS.toSeconds(1).toInt()
connectionCloseTimeoutInSeconds = TimeUnit.HOURS.toSeconds(1).toLong()
} }
val startTime = System.nanoTime() val startTime = System.nanoTime()
var success = false var success = false
while (System.nanoTime() - startTime < timoutInNanos) { while (System.nanoTime() - startTime < connectionTimoutInNanos) {
if (isShutdown()) { if (isShutdown()) {
resetOnError()
// If we are connecting indefinitely, we have to make sure to end the connection process // If we are connecting indefinitely, we have to make sure to end the connection process
val exception = ClientShutdownException("Unable to connect while shutting down") val exception = ClientShutdownException("Unable to connect while shutting down, aborting connection retry attempt to server.")
logger.error(exception) { "Aborting connection retry attempt to server." }
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
@ -517,10 +498,17 @@ open class Client<CONNECTION : Connection>(
logger = logger logger = logger
) )
logger.debug { "Connecting to ${handshakeConnection.infoPub}" } // Note: the pub/sub info is from the perspective of the SERVER
logger.debug { "Connecting to ${handshakeConnection.infoSub}" } val pubSub = handshakeConnection.pubSub
val logInfo = pubSub.reverseForClient().getLogInfo(logger.isDebugEnabled)
connect0(handshake, handshakeConnection, handshakeTimeoutSec) if (logger.isDebugEnabled) {
logger.debug { "Creating new handshake to $logInfo" }
} else {
logger.info { "Creating new handshake to $logInfo" }
}
connect0(handshake, handshakeConnection, handshakeTimeoutSec, connectionCloseTimeoutInSeconds, connectLatch)
success = true success = true
slowDownForException = false slowDownForException = false
@ -541,8 +529,6 @@ open class Client<CONNECTION : Connection>(
logger.info { message } logger.info { message }
} }
handshake.reset()
// maybe the aeron driver isn't running? (or isn't running correctly?) // maybe the aeron driver isn't running? (or isn't running correctly?)
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
@ -553,26 +539,29 @@ open class Client<CONNECTION : Connection>(
slowDownForException = true slowDownForException = true
if (e.cause is ServerException) { if (e.cause is ServerException) {
resetOnError()
val cause = e.cause!! val cause = e.cause!!
val wrapped = ClientException(cause.message!!) val wrapped = ClientException(cause.message!!)
listenerManager.notifyError(wrapped) listenerManager.notifyError(wrapped)
throw wrapped throw wrapped
} else { } else {
resetOnError()
listenerManager.notifyError(e) listenerManager.notifyError(e)
throw e throw e
} }
} catch (e: Exception) { } catch (e: Exception) {
logger.error(e) { "[${handshake.connectKey}] : Un-recoverable error during handshake with $handshakeConnection. Aborting." }
aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it aeronDriver.closeIfSingle() // if we are the ONLY instance using the media driver, restart it
listenerManager.notifyError(ClientException("[${handshake.connectKey}] : Un-recoverable error during handshake with $handshakeConnection. Aborting.", e))
listenerManager.notifyError(e) resetOnError()
throw e throw e
} }
} }
if (!success) { if (!success) {
if (System.nanoTime() - startTime < timoutInNanos) { endpointIsRunning.lazySet(false)
if (System.nanoTime() - startTime < connectionTimoutInNanos) {
val type = if (connection0 == null) { val type = if (connection0 == null) {
"UNKNOWN" "UNKNOWN"
} else if (isIPC) { } else if (isIPC) {
@ -580,27 +569,35 @@ open class Client<CONNECTION : Connection>(
} else { } else {
remoteAddressPrettyString + ":" + config.port remoteAddressPrettyString + ":" + config.port
} }
// we timed out. Throw the appropriate exception // we timed out. Throw the appropriate exception
val exception = ClientTimedOutException("Unable to connect to the server at $type in $connectionTimeoutSec seconds") val exception = ClientTimedOutException("Unable to connect to the server at $type in $connectionTimeoutSec seconds, aborting connection attempt to server.")
logger.error(exception) { "Aborting connection attempt to server." }
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
// If we did not connect - throw an error. When `client.connect()` is called, either it connects or throws an error // If we did not connect - throw an error. When `client.connect()` is called, either it connects or throws an error
val exception = ClientRejectedException("The server did not respond or permit the connection attempt within $connectionTimeoutSec seconds") val exception = ClientRejectedException("The server did not respond or permit the connection attempt within $connectionTimeoutSec seconds, aborting connection retry attempt to server.")
exception.cleanStackTrace() exception.cleanStackTrace()
logger.error(exception) { "Aborting connection retry attempt to server." }
listenerManager.notifyError(exception) listenerManager.notifyError(exception)
throw exception throw exception
} }
} }
// the handshake process might have to restart this connection process. // the handshake process might have to restart this connection process.
private suspend fun connect0(handshake: ClientHandshake<CONNECTION>, handshakeConnection: ClientHandshakeDriver, connectionTimeoutSec: Int) { private suspend fun connect0(
handshake: ClientHandshake<CONNECTION>,
handshakeConnection: ClientHandshakeDriver,
connectionTimeoutSec: Int,
connectionCloseTimeoutInSeconds: Long,
connectLatch: CountDownLatch
) {
// this will block until the connection timeout, and throw an exception if we were unable to connect with the server // this will block until the connection timeout, and throw an exception if we were unable to connect with the server
// throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class) // throws(ConnectTimedOutException::class, ClientRejectedException::class, ClientException::class)
val connectionInfo = handshake.hello(handshakeConnection, connectionTimeoutSec, uuid) val connectionInfo = handshake.hello(handshakeConnection, connectionTimeoutSec, uuid)
@ -615,7 +612,7 @@ open class Client<CONNECTION : Connection>(
handshakeConnection.close() handshakeConnection.close()
val exception = ClientRejectedException("Connection to [$remoteAddressString] not allowed! Public key mismatch.") val exception = ClientRejectedException("Connection to [$remoteAddressString] not allowed! Public key mismatch.")
logger.error(exception) { "Validation error" } listenerManager.notifyError(exception)
throw exception throw exception
} }
@ -642,6 +639,7 @@ open class Client<CONNECTION : Connection>(
ClientRejectedException("[${handshake.connectKey}] Connection to [$remoteAddressString] has incorrect class registration details!!") ClientRejectedException("[${handshake.connectKey}] Connection to [$remoteAddressString] has incorrect class registration details!!")
} }
exception.cleanStackTraceInternal() exception.cleanStackTraceInternal()
listenerManager.notifyError(exception)
throw exception throw exception
} }
@ -658,10 +656,15 @@ open class Client<CONNECTION : Connection>(
// we are now connected, so we can connect to the NEW client-specific ports // we are now connected, so we can connect to the NEW client-specific ports
val clientConnection = ClientConnectionDriver.build(aeronDriver, connectionTimeoutSec, handshakeConnection, connectionInfo) val clientConnection = ClientConnectionDriver.build(aeronDriver, connectionTimeoutSec, handshakeConnection, connectionInfo)
// have to rebuild the client pub/sub for the next part of the handshake (since it's a 1-shot deal for the server per session) // Note: the pub/sub info is from the perspective of the SERVER
// if we go SLOWLY (slower than the linger timeout), it will work. if we go quickly, this it will have problems (so we must do this!) val pubSub = clientConnection.connectionInfo.reverseForClient()
// handshakeConnection.resetSession(logger) val logInfo = pubSub.getLogInfo(logger.isDebugEnabled)
if (logger.isDebugEnabled) {
logger.debug { "Creating new connection to $logInfo" }
} else {
logger.info { "Creating new connection to $logInfo" }
}
val newConnection: CONNECTION val newConnection: CONNECTION
if (handshakeConnection.pubSub.isIpc) { if (handshakeConnection.pubSub.isIpc) {
@ -677,41 +680,6 @@ open class Client<CONNECTION : Connection>(
storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey) storage.addRegisteredServerKey(remoteAddress!!, connectionInfo.publicKey)
} }
// This is set by the client so if there is a "connect()" call in the disconnect callback, we can have proper
// lock-stop ordering for how disconnect and connect work with each-other
// GUARANTEE that the callbacks for 'onDisconnect' happens-before the 'onConnect'.
// a CDL is used because it doesn't matter the order in which it's called (as it will always ensure it's correct)
val lockStepForConnect = dorkbox.util.sync.CountDownLatch(1)
val connectWaitTimeout = if (EventPoller.DEBUG) 99999999L else config.connectionCloseTimeoutInSeconds.toLong()
//////////////
/// Extra Close action
//////////////
newConnection.closeAction = {
// this is called whenever connection.close() is called by the framework or via client.close()
isConnected = false
// make sure to call our client.notifyConnect() callbacks execute
// force us to wait until AFTER the connect logic has run. we MUST use a CDL. A mutex doesn't work properly
lockStepForConnect.await(connectWaitTimeout, TimeUnit.SECONDS)
EventDispatcher.launch(EVENT.DISCONNECT) {
listenerManager.notifyDisconnect(connection)
// we must reset the disconnect-in-progress latch AND count down, so that reconnects can successfully reconnect
val disconnectCDL = disconnectInProgress.getAndSet(null)!!
disconnectCDL.countDown()
}
}
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
EventDispatcher.launch(EVENT.INIT) {
listenerManager.notifyInit(newConnection)
}
connection0 = newConnection connection0 = newConnection
addConnection(newConnection) addConnection(newConnection)
@ -721,54 +689,57 @@ open class Client<CONNECTION : Connection>(
try { try {
handshake.done(handshakeConnection, clientConnection, connectionTimeoutSec, handshakeConnection.details) handshake.done(handshakeConnection, clientConnection, connectionTimeoutSec, handshakeConnection.details)
} catch (e: Exception) { } catch (e: Exception) {
logger.error(e) { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] error during handshake" } listenerManager.notifyError(ClientHandshakeException("[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] error during handshake", e))
throw e throw e
} }
// finished with the handshake, so always close these! // finished with the handshake, so always close these!
handshakeConnection.close() handshakeConnection.close()
isConnected = true
logger.debug { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] done with handshake." } logger.debug { "[${handshakeConnection.details}] (${handshake.connectKey}) Connection (${newConnection.id}) to [$remoteAddressString] done with handshake." }
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
listenerManager.notifyInit(newConnection)
// have to make a new thread to listen for incoming data! // have to make a new thread to listen for incoming data!
// SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them // SUBSCRIPTIONS ARE NOT THREAD SAFE! Only one thread at a time can poll them
networkEventPoller.submit(
// additionally, if we have MULTIPLE clients on the same machine, we are limited by the CPU core count. Ideally we want to share this among ALL clients within the same JVM so that we can support multiple clients/servers action = {
networkEventPoller.submit { // if we initiate a disconnect manually, then there is no need to wait for aeron to verify it's closed
if (!isShutdown()) { // we only want to wait for aeron to verify it's closed if we are SUPPOSED to be connected, but there's a network blip
if (!newConnection.isClosedViaAeron()) { if (!(shutdownEventPoller || newConnection.isClosedViaAeron())) {
// Polls the AERON media driver subscription channel for incoming messages newConnection.poll()
newConnection.poll()
} else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
logger.debug { "[${handshakeConnection.details}] connection expired (cleanup)" }
// When we close via a message (or when the connection timeout has expired), we do not flush the state!
// NOTE: the state is ONLY flushed when client.close() is called!
EventDispatcher.launch(EVENT.CLOSE) {
newConnection.close(enableRemove = true)
}
// remove ourselves from processing
EventPoller.REMOVE
}
} else { } else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
logger.debug { "[${connection}] connection expired (cleanup)" }
// the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
removeConnection(newConnection)
// we already removed the connection, we can call it again without side affects
newConnection.close()
// remove ourselves from processing // remove ourselves from processing
EventPoller.REMOVE EventPoller.REMOVE
} }
} },
onShutdown = {
// this can be closed when the connection is remotely closed in ADDITION to manually closing
logger.debug { "Client event dispatch closing..." }
// if something inside-of listenerManager.notifyConnect is blocking or suspends, then polling will never happen! // we only need to run shutdown methods if there was a network outage or D/C
// This must be on a different thread if (!shutdownInProgress.value) {
EventDispatcher.launch(EVENT.CONNECT) { this@Client.closeSuspending(true)
// what happens if the disconnect runs INSIDE the connect? }
listenerManager.notifyConnect(newConnection)
// now the disconnect logic can run because we are done with the connect logic. // we can now call connect again
lockStepForConnect.countDown() endpointIsRunning.lazySet(false)
} pollerClosedLatch.countDown()
logger.debug { "Closed the Network Event Poller..." }
})
listenerManager.notifyConnect(newConnection)
} }
/** /**
@ -790,7 +761,7 @@ open class Client<CONNECTION : Connection>(
get() = connection.isNetwork get() = connection.isNetwork
/** /**
* @return the connection (TCP or IPC) id of this connection. * @return the connection id of this connection.
*/ */
val id: Int val id: Int
get() = connection.id get() = connection.id
@ -813,27 +784,8 @@ open class Client<CONNECTION : Connection>(
return if (c != null) { return if (c != null) {
c.send(message) c.send(message)
} else { } else {
val exception = ClientException("Cannot send a message when there is no connection!") val exception = TransmitException("Cannot send a message when there is no connection!")
logger.error(exception) { "No connection!" } listenerManager.notifyError(exception)
false
}
}
/**
* Sends a message to the server, if the connection is closed for any reason, this returns false.
*
* @return true if the message was sent successfully, false if the connection has been closed
*/
fun sendBlocking(message: Any): Boolean {
val c = connection0
return if (c != null) {
runBlocking {
c.send(message)
}
} else {
val exception = ClientException("Cannot send a message when there is no connection!")
logger.error(exception) { "No connection!" }
false false
} }
} }
@ -851,23 +803,13 @@ open class Client<CONNECTION : Connection>(
if (c != null) { if (c != null) {
return super.ping(c, pingTimeoutSeconds, function) return super.ping(c, pingTimeoutSeconds, function)
} else { } else {
logger.error(ClientException("Cannot send a ping when there is no connection!")) { "No connection!" } val exception = TransmitException("Cannot send a ping when there is no connection!")
listenerManager.notifyError(exception)
} }
return false return false
} }
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*
* @param function called when the ping returns (ie: update time/latency counters/metrics/etc)
*/
fun pingBlocking(pingTimeoutSeconds: Int = config.pingTimeoutSeconds, function: suspend Ping.() -> Unit): Boolean {
return runBlocking {
ping(pingTimeoutSeconds, function)
}
}
/** /**
* Removes the specified host address from the list of registered server keys. * Removes the specified host address from the list of registered server keys.
*/ */
@ -879,8 +821,24 @@ open class Client<CONNECTION : Connection>(
} }
} }
final override suspend fun close0() { /**
// no impl * Will throw an exception if there are resources that are still in use
*/
fun checkForMemoryLeaks() {
AeronDriver.checkForMemoryLeaks()
}
/**
* If you call close() on the client endpoint, it will shut down all parts of the endpoint (listeners, driver, event polling, etc).
*/
fun close() {
runBlocking {
closeSuspending()
}
}
override fun toString(): String {
return "EndPoint [Client: $uuid]"
} }
fun <R> use(block: (Client<CONNECTION>) -> R): R { fun <R> use(block: (Client<CONNECTION>) -> R): R {
@ -888,10 +846,6 @@ open class Client<CONNECTION : Connection>(
block(this) block(this)
} finally { } finally {
close() close()
runBlocking {
waitForClose()
logger.error { "finished close event" }
}
} }
} }
} }

View File

@ -21,18 +21,14 @@ import dorkbox.network.aeron.EventPoller
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.EventDispatcher
import dorkbox.network.connection.EventDispatcher.Companion.EVENT
import dorkbox.network.connection.IpInfo import dorkbox.network.connection.IpInfo
import dorkbox.network.connection.IpInfo.Companion.IpListenType import dorkbox.network.connection.IpInfo.Companion.IpListenType
import dorkbox.network.connectionType.ConnectionRule import dorkbox.network.connectionType.ConnectionRule
import dorkbox.network.exceptions.AllocationException
import dorkbox.network.exceptions.ServerException import dorkbox.network.exceptions.ServerException
import dorkbox.network.handshake.ServerHandshake import dorkbox.network.handshake.ServerHandshake
import dorkbox.network.handshake.ServerHandshakePollers import dorkbox.network.handshake.ServerHandshakePollers
import dorkbox.network.ipFilter.IpFilterRule import dorkbox.network.ipFilter.IpFilterRule
import dorkbox.network.rmi.RmiSupportServer import dorkbox.network.rmi.RmiSupportServer
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import mu.KotlinLogging import mu.KotlinLogging
import java.util.concurrent.* import java.util.concurrent.*
@ -137,9 +133,7 @@ open class Server<CONNECTION : Connection>(
val timeout = TimeUnit.SECONDS.toMillis(configuration.connectionCloseTimeoutInSeconds.toLong() * 2) val timeout = TimeUnit.SECONDS.toMillis(configuration.connectionCloseTimeoutInSeconds.toLong() * 2)
val logger = KotlinLogging.logger(Server::class.java.simpleName) val logger = KotlinLogging.logger(Server::class.java.simpleName)
AeronDriver(configuration, logger).use { AeronDriver.ensureStopped(configuration, logger, timeout)
it.ensureStopped(timeout, 500)
}
} }
/** /**
@ -151,9 +145,7 @@ open class Server<CONNECTION : Connection>(
*/ */
fun isRunning(configuration: ServerConfiguration): Boolean = runBlocking { fun isRunning(configuration: ServerConfiguration): Boolean = runBlocking {
val logger = KotlinLogging.logger(Server::class.java.simpleName) val logger = KotlinLogging.logger(Server::class.java.simpleName)
AeronDriver(configuration, logger).use { AeronDriver.isRunning(configuration, logger)
it.isRunning()
}
} }
init { init {
@ -167,20 +159,6 @@ open class Server<CONNECTION : Connection>(
*/ */
val rmiGlobal = RmiSupportServer(logger, rmiGlobalSupport) val rmiGlobal = RmiSupportServer(logger, rmiGlobalSupport)
/**
* @return true if this server has successfully bound to an IP address and is running
*/
private var bindAlreadyCalled = atomic(false)
/**
* These are run in lock-step to shutdown/close the server. Afterwards, bind() can be called again
*/
@Volatile
private var shutdownPollLatch = dorkbox.util.sync.CountDownLatch(0 )
@Volatile
private var shutdownEventLatch = dorkbox.util.sync.CountDownLatch(0)
/** /**
* Maintains a thread-safe collection of rules used to define the connection type with this server. * Maintains a thread-safe collection of rules used to define the connection type with this server.
*/ */
@ -191,6 +169,9 @@ open class Server<CONNECTION : Connection>(
*/ */
internal val ipInfo = IpInfo(config) internal val ipInfo = IpInfo(config)
@Volatile
internal lateinit var handshake: ServerHandshake<CONNECTION>
final override fun newException(message: String, cause: Throwable?): Throwable { final override fun newException(message: String, cause: Throwable?): Throwable {
return ServerException(message, cause) return ServerException(message, cause)
} }
@ -220,23 +201,20 @@ open class Server<CONNECTION : Connection>(
try { try {
startDriver() startDriver()
verifyState() verifyState()
initializeLatch() initializeState()
} catch (e: Exception) { } catch (e: Exception) {
resetOnError() resetOnError()
listenerManager.notifyError(ServerException("Unable to start the server!", e)) listenerManager.notifyError(ServerException("Unable to start the server!", e))
return@runBlocking return@runBlocking
} }
shutdownPollLatch = dorkbox.util.sync.CountDownLatch(1)
shutdownEventLatch = dorkbox.util.sync.CountDownLatch(1)
config as ServerConfiguration config as ServerConfiguration
// we are done with initial configuration, now initialize aeron and the general state of this endpoint // we are done with initial configuration, now initialize aeron and the general state of this endpoint
val server = this@Server val server = this@Server
val handshake = ServerHandshake(logger, config, listenerManager, aeronDriver) handshake = ServerHandshake(config, listenerManager, aeronDriver)
val ipcPoller: AeronPoller = if (config.enableIpc) { val ipcPoller: AeronPoller = if (config.enableIpc) {
ServerHandshakePollers.ipc(server, handshake) ServerHandshakePollers.ipc(server, handshake)
@ -251,26 +229,20 @@ open class Server<CONNECTION : Connection>(
IpListenType.IPv6Wildcard -> ServerHandshakePollers.ip6(server, handshake) IpListenType.IPv6Wildcard -> ServerHandshakePollers.ip6(server, handshake)
IpListenType.IPv4 -> ServerHandshakePollers.ip4(server, handshake) IpListenType.IPv4 -> ServerHandshakePollers.ip4(server, handshake)
IpListenType.IPv6 -> ServerHandshakePollers.ip6(server, handshake) IpListenType.IPv6 -> ServerHandshakePollers.ip6(server, handshake)
IpListenType.IPC -> ServerHandshakePollers.disabled("IP Disabled") IpListenType.IPC -> ServerHandshakePollers.disabled("IPv4/6 Disabled")
} }
logger.info { ipcPoller.info } logger.info { ipcPoller.info }
logger.info { ipPoller.info } logger.info { ipPoller.info }
// additionally, if we have MULTIPLE clients on the same machine, we are limited by the CPU core count. Ideally we want to share this among ALL clients within the same JVM so that we can support multiple clients/servers
networkEventPoller.submit( networkEventPoller.submit(
action = { action = {
if (!isShutdown()) { if (!shutdownEventPoller) {
var pollCount = 0
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment. // NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)` // `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
// this checks to see if there are NEW clients on the handshake ports // this checks to see if there are NEW clients to handshake with
pollCount += ipPoller.poll() var pollCount = ipcPoller.poll() + ipPoller.poll()
// this checks to see if there are NEW clients via IPC
pollCount += ipcPoller.poll()
// this manages existing clients (for cleanup + connection polling). This has a concurrent iterator, // this manages existing clients (for cleanup + connection polling). This has a concurrent iterator,
// so we can modify this as we go // so we can modify this as we go
@ -280,25 +252,13 @@ open class Server<CONNECTION : Connection>(
pollCount += connection.poll() pollCount += connection.poll()
} else { } else {
// If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted. // If the connection has either been closed, or has expired, it needs to be cleaned-up/deleted.
logger.debug { "[${connection.details}] connection expired (cleanup)" } logger.debug { "[${connection}] connection expired (cleanup)" }
// the connection MUST be removed in the same thread that is processing events // the connection MUST be removed in the same thread that is processing events (it will be removed again in close, and that is expected)
removeConnection(connection) removeConnection(connection)
// this will call removeConnection again, but that is ok // we already removed the connection, we can call it again without side affects
EventDispatcher.launch(EVENT.CLOSE) { connection.close()
// we already removed the connection
connection.close(enableRemove = false)
// have to manually notify the server-listenerManager that this connection was closed
// if the connection was MANUALLY closed (via calling connection.close()), then the connection-listener-manager is
// instantly notified and on cleanup, the server-listener-manager is called
// this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
EventDispatcher.launch(EVENT.DISCONNECT) {
listenerManager.notifyDisconnect(connection)
}
}
} }
} }
@ -311,63 +271,21 @@ open class Server<CONNECTION : Connection>(
onShutdown = { onShutdown = {
logger.debug { "Server event dispatch closing..." } logger.debug { "Server event dispatch closing..." }
// we want to process **actual** close cleanup events on this thread as well, otherwise we will have threading problems ipcPoller.close()
shutdownPollLatch.await() ipPoller.close()
// we want to clear all the connections FIRST (since we are shutting down) // clear all the handshake info
val cons = mutableListOf<CONNECTION>() handshake.clear()
connections.forEach { cons.add(it) }
connections.clear()
// we can now call bind again
endpointIsRunning.lazySet(false)
pollerClosedLatch.countDown()
// when we close a client or a server, we want to make sure that ALL notifications are finished. logger.debug { "Closed the Network Event Poller..." }
// when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown
// we have to manually clean-up the connections and call server-notifyDisconnect because otherwise this will never get called
try {
cons.forEach { connection ->
logger.info { "[${connection.details}] Connection cleanup and close" }
// make sure the connection is closed (close can only happen once, so a duplicate call does nothing!)
EventDispatcher.launch(EVENT.CLOSE) {
connection.close(enableRemove = true)
// have to manually notify the server-listenerManager that this connection was closed
// if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is
// instantly notified and on cleanup, the server-listenermanager is called
// NOTE: this must be the LAST thing happening!
// the SERVER cannot re-connect to clients, only clients can call 'connect'.
EventDispatcher.launch(EVENT.DISCONNECT) {
listenerManager.notifyDisconnect(connection)
}
}
}
} finally {
ipcPoller.close()
ipPoller.close()
// clear all the handshake info
handshake.clear()
try {
AeronDriver.checkForMemoryLeaks()
// make sure that we have de-allocated all connection data
handshake.checkForMemoryLeaks()
} catch (e: AllocationException) {
logger.error(e) { "Error during server cleanup" }
}
// finish closing -- this lets us make sure that we don't run into race conditions on the thread that calls close()
try {
shutdownEventLatch.countDown()
} catch (ignored: Exception) {
}
}
}) })
} }
/** /**
* Adds an IP+subnet rule that defines what type of connection this IP+subnet should have. * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have.
* - NOTHING : Nothing happens to the in/out bytes * - NOTHING : Nothing happens to the in/out bytes
@ -435,32 +353,29 @@ open class Server<CONNECTION : Connection>(
} }
} }
fun close() { /**
close(false) {} * Will throw an exception if there are resources that are still in use
} */
fun checkForMemoryLeaks() {
AeronDriver.checkForMemoryLeaks()
fun close(onCloseFunction: () -> Unit = {}) { // make sure that we have de-allocated all connection data
close(false, onCloseFunction) handshake.checkForMemoryLeaks()
}
final override fun close(shutdownEndpoint: Boolean, onCloseFunction: () -> Unit) {
super.close(shutdownEndpoint, onCloseFunction)
} }
/** /**
* Closes the server and all it's connections. After a close, you may call 'bind' again. * If you call close() on the server endpoint, it will shut down all parts of the endpoint (listeners, driver, event polling, etc).
*/ */
final override suspend fun close0() { fun close() {
// when we call close, it will shutdown the polling mechanism, then wait for us to tell it to clean-up connections. runBlocking {
// closeSuspending()
// Aeron + the Media Driver will have already been shutdown at this point.
if (bindAlreadyCalled.getAndSet(false)) {
// These are run in lock-step
shutdownPollLatch.countDown()
shutdownEventLatch.await()
} }
} }
override fun toString(): String {
return "EndPoint [Server]"
}
/** /**
* Enable * Enable
*/ */
@ -469,11 +384,6 @@ open class Server<CONNECTION : Connection>(
block(this) block(this)
} finally { } finally {
close() close()
runBlocking {
waitForClose()
logger.error { "finished close event" }
}
} }
} }