code cleanup and comments

This commit is contained in:
Robinson 2023-10-26 14:57:48 +02:00
parent d7884c4d8d
commit 7f2ad97aa7
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
8 changed files with 66 additions and 82 deletions

View File

@ -69,7 +69,6 @@ class ServerConfiguration : dorkbox.network.Configuration() {
field = value
}
/**
* If a connection is in a temporal state (in the middle of a reconnect) and sessions are enabled -- then how long should we consider
* a new connection from the same client as part of the same session.
@ -82,7 +81,6 @@ class ServerConfiguration : dorkbox.network.Configuration() {
field = value
}
/**
* Allows the user to change how endpoint settings and public key information are saved.
*/
@ -92,7 +90,6 @@ class ServerConfiguration : dorkbox.network.Configuration() {
field = value
}
/**
* Validates the current configuration
*/
@ -440,7 +437,6 @@ abstract class Configuration protected constructor() {
field = value
}
/**
* How long a connection must be disconnected before we cleanup the memory associated with it
*/

View File

@ -20,6 +20,7 @@ import dorkbox.network.aeron.*
import dorkbox.network.connection.*
import dorkbox.network.connection.IpInfo.Companion.IpListenType
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
import dorkbox.network.connection.session.SessionConnection
import dorkbox.network.connection.session.SessionManagerFull
import dorkbox.network.connection.session.SessionServer
import dorkbox.network.connectionType.ConnectionRule
@ -109,7 +110,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
init {
if (this is SessionServer) {
// only set this if we need to
sessionManager = SessionManagerFull(config, aeronDriver, config.sessionTimeoutSeconds)
@Suppress("UNCHECKED_CAST")
sessionManager = SessionManagerFull(config, listenerManager as ListenerManager<SessionConnection>, aeronDriver, config.sessionTimeoutSeconds)
}
}
@ -135,7 +137,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
if (config.enableIPv4) { logger.warn("IPv4 is enabled, but only IPC will be used.") }
if (config.enableIPv6) { logger.warn("IPv6 is enabled, but only IPC will be used.") }
bind(0, 0, true)
internalBind(port1 = 0, port2 = 0, onlyBindIpc = true, runShutdownCheck = true)
}
/**
@ -155,18 +157,18 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
require(port2 < 65535) { "port2 must be < 65535" }
}
bind(port1, port2, false)
internalBind(port1 = port1, port2 = port2, onlyBindIpc = false, runShutdownCheck = true)
}
@Suppress("DuplicatedCode")
private fun bind(port1: Int, port2: Int, onlyBindIpc: Boolean) {
private fun internalBind(port1: Int, port2: Int, onlyBindIpc: Boolean, runShutdownCheck: Boolean) {
// the lifecycle of a server is the ENDPOINT (measured via the network event poller)
if (endpointIsRunning.value) {
listenerManager.notifyError(ServerException("Unable to start, the server is already running!"))
return
}
if (!waitForEndpointShutdown()) {
if (runShutdownCheck && !waitForEndpointShutdown()) {
listenerManager.notifyError(ServerException("Unable to start the server!"))
return
}
@ -189,7 +191,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
val server = this@Server
handshake = ServerHandshake(config, listenerManager, aeronDriver)
handshake = ServerHandshake(config, listenerManager, aeronDriver, eventDispatch)
val ipcPoller: AeronPoller = if (config.enableIpc || onlyBindIpc) {
ServerHandshakePollers.ipc(server, handshake)
@ -270,13 +272,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// we only need to run shutdown methods if there was a network outage or D/C
if (!shutdownInProgress.value) {
// this is because we restart automatically on driver errors
val standardClose = !mustRestartDriverOnError
this@Server.close(
closeEverything = false,
sendDisconnectMessage = standardClose,
notifyDisconnect = standardClose,
releaseWaitingThreads = standardClose
)
this@Server.close(closeEverything = false, sendDisconnectMessage = true, releaseWaitingThreads = !mustRestartDriverOnError)
}
@ -301,18 +297,17 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
val p2 = this@Server.port2
if (p1 == 0 && p2 == 0) {
bindIpc()
internalBind(port1 = 0, port2 = 0, onlyBindIpc = true, runShutdownCheck = false)
} else {
bind(p1, p2)
internalBind(port1 = p1, port2 = p2, onlyBindIpc = false, runShutdownCheck = false)
}
}
}
// we can now call bind again
endpointIsRunning.lazySet(false)
logger.debug("Closed the Network Event Poller task.")
pollerClosedLatch.countDown()
logger.debug("Closed the Network Event Poller...")
}
})
}
@ -397,12 +392,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
* @param closeEverything if true, all parts of the server will be closed (listeners, driver, event polling, etc)
*/
fun close(closeEverything: Boolean = true) {
close(
closeEverything = closeEverything,
sendDisconnectMessage = true,
notifyDisconnect = true,
releaseWaitingThreads = true
)
close(closeEverything = closeEverything, sendDisconnectMessage = true, releaseWaitingThreads = true)
}
override fun toString(): String {

View File

@ -918,9 +918,7 @@ class AeronDriver(config: Configuration, val logger: Logger, val endPoint: EndPo
// publication was actually closed or the server was closed, so no bother throwing an error
connection.closeImmediately(
sendDisconnectMessage = false,
notifyDisconnect = true,
closeEverything = false
sendDisconnectMessage = false, closeEverything = false
)
return false

View File

@ -197,10 +197,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
// close will make sure to run on a different thread
endPointUsages.forEach {
it.close(closeEverything = false,
sendDisconnectMessage = false,
notifyDisconnect = false,
releaseWaitingThreads = false)
// we cannot send the DC message because the network layer has issues!
it.close(closeEverything = false, sendDisconnectMessage = false, releaseWaitingThreads = false)
}
}
}

View File

@ -255,7 +255,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
fun onDisconnect(function: Connection.() -> Unit) {
// make sure we atomically create the listener manager, if necessary
listenerManager.getAndUpdate { origManager ->
origManager ?: ListenerManager(logger)
origManager ?: ListenerManager(logger, endPoint.eventDispatch)
}
listenerManager.value!!.onDisconnect(function)
@ -267,7 +267,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
fun <MESSAGE> onMessage(function: Connection.(MESSAGE) -> Unit) {
// make sure we atomically create the listener manager, if necessary
listenerManager.getAndUpdate { origManager ->
origManager ?: ListenerManager(logger)
origManager ?: ListenerManager(logger, endPoint.eventDispatch)
}
listenerManager.value!!.onMessage(function)
@ -323,17 +323,17 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*/
fun close() {
close(sendDisconnectMessage = true,
notifyDisconnect = true,
closeEverything = true)
}
/**
* Closes the connection, and removes all connection specific listeners
*/
internal fun close(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean, closeEverything: Boolean) {
internal fun close(sendDisconnectMessage: Boolean, closeEverything: Boolean) {
// there are 2 ways to call close.
// MANUALLY
// When a connection is disconnected via a timeout/expire.
// the compareAndSet is used to make sure that if we call close() MANUALLY, (and later) when the auto-cleanup/disconnect is called -- it doesn't
// try to do it again.
@ -342,28 +342,26 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val close = endPoint.eventDispatch.CLOSE
if (!close.isDispatch()) {
close.launch {
close(sendDisconnectMessage, notifyDisconnect, closeEverything)
close(sendDisconnectMessage = sendDisconnectMessage, closeEverything = closeEverything)
}
return
}
closeImmediately(
sendDisconnectMessage = sendDisconnectMessage,
notifyDisconnect = notifyDisconnect,
closeEverything = closeEverything)
closeImmediately(sendDisconnectMessage = sendDisconnectMessage, closeEverything = closeEverything)
}
// connection.close() -> this
// endpoint.close() -> connection.close() -> this
internal fun closeImmediately(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean, closeEverything: Boolean) {
internal fun closeImmediately(sendDisconnectMessage: Boolean, closeEverything: Boolean) {
// the server 'handshake' connection info is cleaned up with the disconnect via timeout/expire.
if (!isClosed.compareAndSet(expect = false, update = true)) {
logger.debug("[$toString0] connection ignoring close request.")
return
}
if (logger.isDebugEnabled) {
logger.debug("[$toString0] connection closing. sendDisconnectMessage=$sendDisconnectMessage, notifyDisconnect=$notifyDisconnect, closeEverything=$closeEverything")
logger.debug("[$toString0] connection closing. sendDisconnectMessage=$sendDisconnectMessage, closeEverything=$closeEverything")
}
// make sure to save off the RMI objects for session management
@ -397,7 +395,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
// on close, we want to make sure this file is DELETED!
try {
// we might not be able to close this connection.
endPoint.aeronDriver.close(publication, toString0)
@ -407,11 +404,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
}
// NOTE: any waiting RMI messages that are in-flight will terminate when they time-out (and then do nothing)
// NOTE: notifyDisconnect() is called inside closeAction()!!
if (notifyDisconnect) {
// if there are errors within the driver, we do not want to notify disconnect, as we will automatically reconnect.
endPoint.listenerManager.notifyDisconnect(this)
}
// if there are errors within the driver, we do not want to notify disconnect, as we will automatically reconnect.
endPoint.listenerManager.notifyDisconnect(this)
endPoint.removeConnection(this)

View File

@ -267,12 +267,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
hook = Thread {
close(
closeEverything = true,
sendDisconnectMessage = true,
notifyDisconnect = true,
releaseWaitingThreads = true
)
close(closeEverything = true, sendDisconnectMessage = true, releaseWaitingThreads = true)
}
Runtime.getRuntime().addShutdownHook(hook)
@ -906,11 +901,17 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
internal fun close(
closeEverything: Boolean,
sendDisconnectMessage: Boolean,
notifyDisconnect: Boolean,
releaseWaitingThreads: Boolean)
{
if (!eventDispatch.CLOSE.isDispatch()) {
eventDispatch.CLOSE.launch {
close(closeEverything, sendDisconnectMessage, releaseWaitingThreads)
}
return
}
if (logger.isDebugEnabled) {
logger.debug("Requesting close: closeEverything=$closeEverything, sendDisconnectMessage=$sendDisconnectMessage, notifyDisconnect=$notifyDisconnect, releaseWaitingThreads=$releaseWaitingThreads")
logger.debug("Requesting close: closeEverything=$closeEverything, sendDisconnectMessage=$sendDisconnectMessage, releaseWaitingThreads=$releaseWaitingThreads")
}
// 1) endpoints can call close()
@ -948,7 +949,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
}
EventDispatcher.CLOSE.launch {
if (logger.isDebugEnabled) {
logger.debug("Shutting down endpoint...")
}
@ -972,6 +974,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// this waits for the ENDPOINT to finish running its tasks in the poller.
pollerClosedLatch.await()
// this will ONLY close the event dispatcher if ALL endpoints have closed it.
// when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed.
networkEventPoller.close(logger, this)
@ -991,7 +995,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
// don't do these things if we are "closed" from a client connection disconnect
// if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
if (closeEverything) {
// when the client connection is closed, we don't close the driver/etc.
@ -1002,6 +1005,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
storage.close()
}
// we might be restarting the aeron driver, so make sure it's closed.
aeronDriver.close()
shutdown = true
@ -1016,8 +1020,6 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
logger.info("Done shutting down the endpoint.")
}
}
}
/**

View File

@ -23,6 +23,7 @@ import dorkbox.network.Configuration
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.util.Sys
import net.jodah.expiringmap.ExpirationPolicy
import net.jodah.expiringmap.ExpiringMap
@ -31,8 +32,10 @@ import java.util.concurrent.*
internal open class SessionManagerFull<CONNECTION: SessionConnection>(
config: Configuration,
listenerManager: ListenerManager<CONNECTION>,
val aeronDriver: AeronDriver,
sessionTimeout: Long): SessionManager<CONNECTION> {
sessionTimeout: Long
): SessionManager<CONNECTION> {
companion object {
private val logger = LoggerFactory.getLogger(SessionManagerFull::class.java.simpleName)
@ -59,10 +62,12 @@ internal open class SessionManagerFull<CONNECTION: SessionConnection>(
expiringSessions = ExpiringMap.builder()
.expiration(sessionTimeout, timeUnit)
.expirationPolicy(ExpirationPolicy.CREATED)
.expirationListener<ByteArrayWrapper, Session<CONNECTION>> { publicKeyWrapped, _ ->
.expirationListener<ByteArrayWrapper, Session<CONNECTION>> { publicKeyWrapped, sessionConnection ->
// this blocks until it fully runs (which is ok. this is fast)
logger.debug("Connection session expired for: ${publicKeyWrapped.bytes.toHexString()}")
// this SESSION has expired, so we should call the onDisconnect for the underlying connection, in order to clean it up.
listenerManager.notifyDisconnect(sessionConnection.connection)
}
.build()
}
@ -87,13 +92,11 @@ internal open class SessionManagerFull<CONNECTION: SessionConnection>(
if (expiring != null) {
// we must always set this session value!!
connection.session = expiring
expiring
} else {
val existing = sessions[publicKeyWrapped]
if (existing != null) {
// we must always set this session value!!
connection.session = existing
existing
} else {
@Suppress("UNCHECKED_CAST")
val newSession = (connection.endPoint as SessionEndpoint<CONNECTION>).newSession(connection as CONNECTION)
@ -102,7 +105,6 @@ internal open class SessionManagerFull<CONNECTION: SessionConnection>(
connection.session = newSession
sessions[publicKeyWrapped] = newSession
newSession
}
}
}
@ -114,14 +116,12 @@ internal open class SessionManagerFull<CONNECTION: SessionConnection>(
*
* @return true if this is a new session, false if it is an existing session
*/
@Suppress("UNCHECKED_CAST")
override fun onInit(connection: Connection): Boolean {
// we know this will always be the case, because if this specific method can be called, then it will be a sessionConnection
connection as SessionConnection
@Suppress("UNCHECKED_CAST")
val session: Session<CONNECTION> = connection.session as Session<CONNECTION>
@Suppress("UNCHECKED_CAST")
session.restore(connection as CONNECTION)
// the FIRST time this method is called, it will be true. EVERY SUBSEQUENT TIME, it will be false
@ -133,16 +133,21 @@ internal open class SessionManagerFull<CONNECTION: SessionConnection>(
* Always called when a connection is disconnected from the network
*/
override fun onDisconnect(connection: CONNECTION) {
val publicKeyWrapped = ByteArrayWrapper.wrap(connection.uuid)
try {
val publicKeyWrapped = ByteArrayWrapper.wrap(connection.uuid)
val session = synchronized(sessions) {
val session = sessions.remove(publicKeyWrapped)
// we want to expire this session after XYZ time
expiringSessions[publicKeyWrapped] = session
session
val session = synchronized(sessions) {
val sess = sessions.remove(publicKeyWrapped)
// we want to expire this session after XYZ time
expiringSessions[publicKeyWrapped] = sess
sess
}
session!!.save(connection)
}
catch (e: Exception) {
logger.error("Unable to run save data for the session!", e)
}
session!!.save(connection)
}
}

View File

@ -45,7 +45,8 @@ import java.util.concurrent.*
internal class ServerHandshake<CONNECTION : Connection>(
private val config: ServerConfiguration,
private val listenerManager: ListenerManager<CONNECTION>,
val aeronDriver: AeronDriver
private val aeronDriver: AeronDriver,
private val eventDispatch: EventDispatcher
) {
// note: the expire time here is a LITTLE longer than the expire time in the client, this way we can adjust for network lag if it's close
@ -701,7 +702,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
val connections = pendingConnections
val latch = CountDownLatch(connections.size)
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
eventDispatch.CLOSE.launch {
connections.forEach { (_, v) ->
v.close()
latch.countDown()