diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 86743d64..2c305a41 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -25,14 +25,14 @@ import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.EventActionOperator import dorkbox.network.aeron.EventCloseOperator import dorkbox.network.aeron.EventPoller -import dorkbox.network.connection.* +import dorkbox.network.connection.Connection +import dorkbox.network.connection.ConnectionParams +import dorkbox.network.connection.EndPoint import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal -import dorkbox.network.connection.session.SessionClient -import dorkbox.network.connection.session.SessionConnection -import dorkbox.network.connection.session.SessionManagerFull -import dorkbox.network.connection.session.SessionManagerNoOp +import dorkbox.network.connection.PublicKeyValidationState +import dorkbox.network.connection.buffer.BufferManager import dorkbox.network.exceptions.* import dorkbox.network.handshake.ClientConnectionDriver import dorkbox.network.handshake.ClientHandshake @@ -148,6 +148,14 @@ open class Client(config: ClientConfiguration = ClientC @Volatile private var stopConnectOnShutdown = false + /** + * Different connections (to the same client) can be "buffered", meaning that if they "go down" because of a network glitch -- the data + * being sent is not lost (it is buffered) and then re-sent once the new connection is established. References to the old connection + * will also redirect to the new connection. + */ + @Volatile + internal var bufferedManager: BufferManager? = null + // is valid when there is a connection to the server, otherwise it is null @Volatile private var connection0: CONNECTION? = null @@ -420,7 +428,6 @@ open class Client(config: ClientConfiguration = ClientC require(port1 < 65535) { "port1 must be < 65535" } require(port2 < 65535) { "port2 must be < 65535" } } - require(connectionTimeoutSec >= 0) { "connectionTimeoutSec '$connectionTimeoutSec' is invalid. It must be >=0" } // only try to connect via IPv4 if we have a network interface that supports it! @@ -550,7 +557,7 @@ open class Client(config: ClientConfiguration = ClientC // the handshake connection is closed when the handshake has an error, or it is finished - var handshakeConnection: ClientHandshakeDriver? = null + var handshakeConnection: ClientHandshakeDriver? try { // always start the aeron driver inside the restart loop. @@ -699,6 +706,9 @@ open class Client(config: ClientConfiguration = ClientC handshakeTimeoutNs = handshakeTimeoutNs ) + bufferedManager = BufferManager(config, listenerManager, aeronDriver, connectionInfo.sessionTimeout) + + // VALIDATE:: check to see if the remote connection's public key has changed! val validateRemoteAddress = if (handshakeConnection.pubSub.isIpc) { PublicKeyValidationState.VALID @@ -720,13 +730,6 @@ open class Client(config: ClientConfiguration = ClientC // is rogue, we do not want to carelessly provide info. - // NOTE: this can change depending on what the server specifies! - // should we queue messages during a reconnect? This is important if the client/server connection is unstable - if (connectionInfo.enableSession && sessionManager is SessionManagerNoOp) { - sessionManager = SessionManagerFull(config, listenerManager as ListenerManager, aeronDriver, connectionInfo.sessionTimeout) - } else if (!connectionInfo.enableSession && sessionManager is SessionManagerFull) { - sessionManager = SessionManagerNoOp() - } /////////////// //// RMI @@ -772,12 +775,12 @@ open class Client(config: ClientConfiguration = ClientC ) val pubSub = clientConnection.connectionInfo - val logInfo = pubSub.getLogInfo(logger) - val connectionType = if (this is SessionClient) "session connection" else "connection" + + val logInfo = pubSub.getLogInfo(logger.isDebugEnabled) if (logger.isDebugEnabled) { - logger.debug("Creating new $connectionType to $logInfo") + logger.debug("Creating new buffered connection to $logInfo") } else { - logger.info("Creating new $connectionType to $logInfo") + logger.info("Creating new buffered connection to $logInfo") } val newConnection = newConnection(ConnectionParams( @@ -788,17 +791,16 @@ open class Client(config: ClientConfiguration = ClientC connectionInfo.secretKey )) - sessionManager.onNewConnection(newConnection) + bufferedManager?.onConnect(newConnection) if (!handshakeConnection.pubSub.isIpc) { // NOTE: Client can ALWAYS connect to the server. The server makes the decision if the client can connect or not. - val connType = if (newConnection is SessionConnection) "Session connection" else "Connection" if (logger.isTraceEnabled) { - logger.trace("[${handshakeConnection.details}] (${handshake.connectKey}) $connType (${newConnection.id}) adding new signature for [$addressString -> ${connectionInfo.publicKey.toHexString()}]") + logger.trace("[${handshakeConnection.details}] (${handshake.connectKey}) Buffered connection (${newConnection.id}) adding new signature for [$addressString -> ${connectionInfo.publicKey.toHexString()}]") } else if (logger.isDebugEnabled) { - logger.debug("[${handshakeConnection.details}] $connType (${newConnection.id}) adding new signature for [$addressString -> ${connectionInfo.publicKey.toHexString()}]") + logger.debug("[${handshakeConnection.details}] Buffered connection (${newConnection.id}) adding new signature for [$addressString -> ${connectionInfo.publicKey.toHexString()}]") } else if (logger.isInfoEnabled) { - logger.info("[${handshakeConnection.details}] $connType adding new signature for [$addressString -> ${connectionInfo.publicKey.toHexString()}]") + logger.info("[${handshakeConnection.details}] Buffered connection adding new signature for [$addressString -> ${connectionInfo.publicKey.toHexString()}]") } storage.addRegisteredServerKey(address!!, connectionInfo.publicKey) @@ -822,25 +824,18 @@ open class Client(config: ClientConfiguration = ClientC // finished with the handshake, so always close these! handshakeConnection.close(this) - val connType = if (newConnection is SessionConnection) "Session connection" else "Connection" if (logger.isTraceEnabled) { - logger.debug("[${handshakeConnection.details}] (${handshake.connectKey}) $connType (${newConnection.id}) done with handshake.") + logger.debug("[${handshakeConnection.details}] (${handshake.connectKey}) Buffered connection (${newConnection.id}) done with handshake.") } else if (logger.isDebugEnabled) { - logger.debug("[${handshakeConnection.details}] $connType (${newConnection.id}) done with handshake.") + logger.debug("[${handshakeConnection.details}] Buffered connection (${newConnection.id}) done with handshake.") } connection0 = newConnection newConnection.setImage() - // in the specific case of using sessions, we don't want to call 'init' or `connect` for a connection that is resuming a session - // when applicable - we ALSO want to restore RMI objects BEFORE the connection is fully setup! - val newSession = sessionManager.onInit(newConnection) - // before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls - if (newSession) { - listenerManager.notifyInit(newConnection) - } + listenerManager.notifyInit(newConnection) // this enables the connection to start polling for messages addConnection(newConnection) @@ -882,7 +877,7 @@ open class Client(config: ClientConfiguration = ClientC } val mustRestartDriverOnError = aeronDriver.internal.mustRestartDriverOnError - val dirtyDisconnectWithSession = (this@Client is SessionClient) && !shutdownEventPoller && connection.isDirtyClose() + val dirtyDisconnectWithSession = !shutdownEventPoller && connection.isDirtyClose() autoReconnect = mustRestartDriverOnError || dirtyDisconnectWithSession @@ -950,9 +945,7 @@ open class Client(config: ClientConfiguration = ClientC listenerManager.notifyConnect(newConnection) - if (!newSession) { - (newConnection as SessionConnection).sendPendingMessages() - } + newConnection.sendBufferedMessages() } /** @@ -1050,6 +1043,7 @@ open class Client(config: ClientConfiguration = ClientC */ fun close(closeEverything: Boolean = true) { stopConnectOnShutdown = true + bufferedManager?.close() close(closeEverything = closeEverything, sendDisconnectMessage = true, releaseWaitingThreads = true) } diff --git a/src/dorkbox/network/Configuration.kt b/src/dorkbox/network/Configuration.kt index 355fec8c..fbe3faed 100644 --- a/src/dorkbox/network/Configuration.kt +++ b/src/dorkbox/network/Configuration.kt @@ -70,12 +70,12 @@ class ServerConfiguration : dorkbox.network.Configuration() { } /** - * If a connection is in a temporal state (in the middle of a reconnect) and sessions are enabled -- then how long should we consider - * a new connection from the same client as part of the same session. + * If a connection is in a temporal state (in the middle of a reconnect) and a buffered connection is in use -- then how long should we consider + * a new connection from the same client as part of the same "session". * * The session timeout cannot be shorter than 60 seconds, and the server will send this configuration to the client */ - var sessionTimeoutSeconds = TimeUnit.MINUTES.toSeconds(2) + var bufferedConnectionTimeoutSeconds = TimeUnit.MINUTES.toSeconds(2) set(value) { require(!contextDefined) { errorMessage } field = value @@ -118,7 +118,7 @@ class ServerConfiguration : dorkbox.network.Configuration() { config.listenIpAddress = listenIpAddress config.maxClientCount = maxClientCount config.maxConnectionsPerIpAddress = maxConnectionsPerIpAddress - config.sessionTimeoutSeconds = sessionTimeoutSeconds + config.bufferedConnectionTimeoutSeconds = bufferedConnectionTimeoutSeconds config.settingsStore = settingsStore super.copy(config) @@ -134,7 +134,7 @@ class ServerConfiguration : dorkbox.network.Configuration() { if (listenIpAddress != other.listenIpAddress) return false if (maxClientCount != other.maxClientCount) return false if (maxConnectionsPerIpAddress != other.maxConnectionsPerIpAddress) return false - if (sessionTimeoutSeconds != other.sessionTimeoutSeconds) return false + if (bufferedConnectionTimeoutSeconds != other.bufferedConnectionTimeoutSeconds) return false if (settingsStore != other.settingsStore) return false return true @@ -145,7 +145,7 @@ class ServerConfiguration : dorkbox.network.Configuration() { result = 31 * result + listenIpAddress.hashCode() result = 31 * result + maxClientCount result = 31 * result + maxConnectionsPerIpAddress - result = 31 * result + sessionTimeoutSeconds.hashCode() + result = 31 * result + bufferedConnectionTimeoutSeconds.hashCode() result = 31 * result + settingsStore.hashCode() return result } @@ -175,7 +175,6 @@ class ClientConfiguration : dorkbox.network.Configuration() { super.validate() // have to do some basic validation of our configuration - if (port != -1) { // this means it was configured! require(port > 0) { "Client listen port must be > 0" } diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 73dbda81..9d0c89e3 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -20,9 +20,7 @@ import dorkbox.network.aeron.* import dorkbox.network.connection.* import dorkbox.network.connection.IpInfo.Companion.IpListenType import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace -import dorkbox.network.connection.session.SessionConnection -import dorkbox.network.connection.session.SessionManagerFull -import dorkbox.network.connection.session.SessionServer +import dorkbox.network.connection.buffer.BufferManager import dorkbox.network.connectionType.ConnectionRule import dorkbox.network.exceptions.ServerException import dorkbox.network.handshake.ServerHandshake @@ -30,6 +28,7 @@ import dorkbox.network.handshake.ServerHandshakePollers import dorkbox.network.ipFilter.IpFilterRule import dorkbox.network.rmi.RmiSupportServer import org.slf4j.LoggerFactory +import java.net.InetAddress import java.util.concurrent.* /** @@ -103,16 +102,19 @@ open class Server(config: ServerConfiguration = ServerC @Volatile internal lateinit var handshake: ServerHandshake + /** + * Different connections (to the same client) can be "buffered", meaning that if they "go down" because of a network glitch -- the data + * being sent is not lost (it is buffered) and then re-sent once the new connection is established. References to the old connection + * will also redirect to the new connection. + */ + internal val bufferedManager: BufferManager + private val string0: String by lazy { "EndPoint [Server: ${storage.publicKey.toHexString()}]" } init { - if (this is SessionServer) { - // only set this if we need to - @Suppress("UNCHECKED_CAST") - sessionManager = SessionManagerFull(config, listenerManager as ListenerManager, aeronDriver, config.sessionTimeoutSeconds) - } + bufferedManager = BufferManager(config, listenerManager, aeronDriver, config.bufferedConnectionTimeoutSeconds) } final override fun newException(message: String, cause: Throwable?): Throwable { @@ -392,6 +394,7 @@ open class Server(config: ServerConfiguration = ServerC * @param closeEverything if true, all parts of the server will be closed (listeners, driver, event polling, etc) */ fun close(closeEverything: Boolean = true) { + bufferedManager.close() close(closeEverything = closeEverything, sendDisconnectMessage = true, releaseWaitingThreads = true) } diff --git a/src/dorkbox/network/aeron/AeronDriver.kt b/src/dorkbox/network/aeron/AeronDriver.kt index b3a662b8..071c0b11 100644 --- a/src/dorkbox/network/aeron/AeronDriver.kt +++ b/src/dorkbox/network/aeron/AeronDriver.kt @@ -907,20 +907,10 @@ class AeronDriver(config: Configuration, val logger: Logger, val endPoint: EndPo listenerManager.notifyError(exception) return false } - else if (connection.endPoint.sessionManager.enabled()) { - // if we are a SESSION, then the message will be placed into a queue to be re-sent once the connection comes back - // no extra actions required by us. Returning a "false" here makes sure that the session manager picks-up this message to - // re-broadcast (eventually) on the updated connection - return false - } else { - logger.info("[${publication.sessionId()}] Connection disconnected while sending data, closing connection.") - internal.mustRestartDriverOnError = true - - // publication was actually closed or the server was closed, so no bother throwing an error - connection.closeImmediately( - sendDisconnectMessage = false, closeEverything = false - ) - + else { + // by default, we BUFFER data on a connection -- so the message will be placed into a queue to be re-sent once the connection comes back + // no extra actions required by us. + // Returning a "false" here makes sure that the session manager picks-up this message to e-broadcast (eventually) on the updated connection return false } } @@ -947,15 +937,6 @@ class AeronDriver(config: Configuration, val logger: Logger, val endPoint: EndPo // this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's // done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to // send the message. - - if (!endPoint.shutdownInProgress.value && !endPoint.sessionManager.enabled()) { - // we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this) - // additionally, if we are managing pending messages, don't show an error (since the message will be queued to send again) - val exception = endPoint.newException( - "[${publication.sessionId()}] Unable to send message. (Connection is closed, aborted attempt! ${errorCodeName(result)})" - ).cleanStackTrace(5) - listenerManager.notifyError(exception) - } return false } @@ -1053,15 +1034,6 @@ class AeronDriver(config: Configuration, val logger: Logger, val endPoint: EndPo // this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's // done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to // send the message. - - if (!endPoint.shutdownInProgress.value && !endPoint.sessionManager.enabled()) { - // we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this) - // additionally, if we are managing pending messages, don't show an error (since the message will be queued to send again) - val exception = endPoint.newException( - "[${publication.sessionId()}] Unable to send message. (Connection is closed, aborted attempt! ${errorCodeName(result)})" - ) - listenerManager.notifyError(exception) - } return false } diff --git a/src/dorkbox/network/aeron/AeronDriverInternal.kt b/src/dorkbox/network/aeron/AeronDriverInternal.kt index d4c44a86..166483f8 100644 --- a/src/dorkbox/network/aeron/AeronDriverInternal.kt +++ b/src/dorkbox/network/aeron/AeronDriverInternal.kt @@ -148,7 +148,6 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration // this is bad! We must close this connection. THIS WILL BE CALLED AS FAST AS THE CPU CAN RUN (because of how aeron works). if (!mustRestartDriverOnError) { - var restartNetwork = false // if the network interface is removed (for example, a VPN connection). diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index d81a4de5..83a0aa12 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -19,7 +19,7 @@ import dorkbox.network.Client import dorkbox.network.Server import dorkbox.network.aeron.AeronDriver.Companion.sessionIdAllocator import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator -import dorkbox.network.connection.session.SessionConnection +import dorkbox.network.connection.buffer.BufferedSession import dorkbox.network.ping.Ping import dorkbox.network.rmi.RmiSupportConnection import io.aeron.Image @@ -32,10 +32,15 @@ import org.agrona.DirectBuffer import javax.crypto.SecretKey /** - * This connection is established once the registration information is validated, and the various connect/filter checks have passed + * This connection is established once the registration information is validated, and the various connect/filter checks have passed. + * + * Connections are also BUFFERED, meaning that if the connection between a client-server goes down because of a network glitch, then the + * data being sent is not lost (it is buffered) and then re-sent once a new connection has the same UUID within the timout period. + * + * References to the old connection will also redirect to the new connection. */ open class Connection(connectionParameters: ConnectionParams<*>) { - private var messageHandler: FragmentHandler + private val messageHandler: FragmentHandler /** * The specific connection details for this connection! @@ -106,6 +111,11 @@ open class Connection(connectionParameters: ConnectionParams<*>) { */ val isNetwork = !isIpc + /** + * used when the connection is buffered + */ + private val bufferedSession: BufferedSession + /** * The largest size a SINGLE message via AERON can be. Because the maximum size we can send in a "single fragment" is the * publication.maxPayloadLength() function (which is the MTU length less header). We could depend on Aeron for fragment reassembly, @@ -164,11 +174,17 @@ open class Connection(connectionParameters: ConnectionParams<*>) { endPoint.dataReceive(buffer, offset, length, header, this@Connection) } + bufferedSession = when (endPoint) { + is Server -> endPoint.bufferedManager.onConnect(this) + is Client -> endPoint.bufferedManager!!.onConnect(this) + else -> throw RuntimeException("Unable to determine type, aborting!") + } + @Suppress("LeakingThis") rmi = endPoint.rmiConnectionSupport.getNewRmiSupport(this) // For toString() and logging - toString0 = info.getLogInfo(logger) + toString0 = info.getLogInfo(logger.isDebugEnabled) } /** @@ -200,9 +216,9 @@ open class Connection(connectionParameters: ConnectionParams<*>) { /** * Safely sends objects to a destination, if `abortEarly` is true, there are no retries if sending the message fails. * - * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown! + * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown! */ - internal open fun send(message: Any, abortEarly: Boolean): Boolean { + internal fun send(message: Any, abortEarly: Boolean): Boolean { if (logger.isTraceEnabled) { // The handshake sessionId IS NOT globally unique // don't automatically create the lambda when trace is disabled! Because this uses 'outside' scoped info, it's a new lambda each time! @@ -210,7 +226,28 @@ open class Connection(connectionParameters: ConnectionParams<*>) { logger.trace("[$toString0] send: ${message.javaClass.simpleName} : $message") } } - return endPoint.write(message, publication, sendIdleStrategy, this@Connection, maxMessageSize, abortEarly) + + val success = endPoint.write(message, publication, sendIdleStrategy, this@Connection, maxMessageSize, abortEarly) + + return if (!success && message !is DisconnectMessage) { + // queue up the messages, because we couldn't write them for whatever reason! + // NEVER QUEUE THE DISCONNECT MESSAGE! + bufferedSession.queueMessage(this@Connection, message, abortEarly) + } else { + success + } + } + + private fun sendNoBuffer(message: Any): Boolean { + if (logger.isTraceEnabled) { + // The handshake sessionId IS NOT globally unique + // don't automatically create the lambda when trace is disabled! Because this uses 'outside' scoped info, it's a new lambda each time! + if (logger.isTraceEnabled) { + logger.trace("[$toString0] send: ${message.javaClass.simpleName} : $message") + } + } + + return endPoint.write(message, publication, sendIdleStrategy, this@Connection, maxMessageSize, false) } /** @@ -282,6 +319,17 @@ open class Connection(connectionParameters: ConnectionParams<*>) { return listenerManager.value?.notifyOnMessage(this, message) ?: false } + internal fun sendBufferedMessages() { + // now send all buffered/pending messages + if (logger.isDebugEnabled) { + logger.debug("Sending pending messages: ${bufferedSession.pendingMessagesQueue.size}") + } + + bufferedSession.pendingMessagesQueue.forEach { + sendNoBuffer(it) + } + } + /** * @return true if this connection has had close() called */ @@ -380,8 +428,20 @@ open class Connection(connectionParameters: ConnectionParams<*>) { } // make sure to save off the RMI objects for session management - if (!closeEverything && endPoint.sessionManager.enabled()) { - endPoint.sessionManager.onDisconnect(this as SessionConnection) + if (!closeEverything) { + when (endPoint) { + is Server -> endPoint.bufferedManager.onDisconnect(this) + is Client -> endPoint.bufferedManager!!.onDisconnect(this) + else -> throw RuntimeException("Unable to determine type, aborting!") + } + } + + if (!closeEverything) { + when (endPoint) { + is Server -> endPoint.bufferedManager.onDisconnect(this) + is Client -> endPoint.bufferedManager!!.onDisconnect(this) + else -> throw RuntimeException("Unable to determine type, aborting!") + } } // on close, we want to make sure this file is DELETED! diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 1e7212cc..2fc8a6a1 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -24,9 +24,6 @@ import dorkbox.network.ServerConfiguration import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.BacklogStat import dorkbox.network.aeron.EventPoller -import dorkbox.network.connection.session.SessionConnection -import dorkbox.network.connection.session.SessionManager -import dorkbox.network.connection.session.SessionManagerNoOp import dorkbox.network.connection.streaming.StreamingControl import dorkbox.network.connection.streaming.StreamingData import dorkbox.network.connection.streaming.StreamingManager @@ -182,14 +179,6 @@ abstract class EndPoint private constructor(val type: C private val streamingManager = StreamingManager(logger, config) - /** - * By default, this is a NO-OP version! We do not want if/else checks for every message! - * this can change of the lifespan of the CLIENT, depending on which/what server a single client connects to. - */ - @Volatile - internal var sessionManager: SessionManager = SessionManagerNoOp() - - /** * The primary machine port that the server will listen for connections on */ @@ -990,74 +979,70 @@ abstract class EndPoint private constructor(val type: C - if (logger.isDebugEnabled) { - logger.debug("Shutting down endpoint...") - } + if (logger.isDebugEnabled) { + logger.debug("Shutting down endpoint...") + } - // always do this. It is OK to run this multiple times - // the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections - // inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect() - connections.forEach { - it.closeImmediately(sendDisconnectMessage = sendDisconnectMessage, closeEverything = closeEverything) - } + // always do this. It is OK to run this multiple times + // the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections + // inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect() + connections.forEach { + it.closeImmediately(sendDisconnectMessage = sendDisconnectMessage, closeEverything = closeEverything) + } - // this closes the endpoint specific instance running in the poller + // this closes the endpoint specific instance running in the poller - // THIS WILL SHUT DOWN THE EVENT POLLER IMMEDIATELY! BUT IN AN ASYNC MANNER! - shutdownEventPoller = true + // THIS WILL SHUT DOWN THE EVENT POLLER IMMEDIATELY! BUT IN AN ASYNC MANNER! + shutdownEventPoller = true - // if we close the poller AND listener manager too quickly, events will not get published - // this waits for the ENDPOINT to finish running its tasks in the poller. - pollerClosedLatch.await() + // if we close the poller AND listener manager too quickly, events will not get published + // this waits for the ENDPOINT to finish running its tasks in the poller. + pollerClosedLatch.await() - // this will ONLY close the event dispatcher if ALL endpoints have closed it. - // when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed. - networkEventPoller.close(logger, this) + // this will ONLY close the event dispatcher if ALL endpoints have closed it. + // when an endpoint closes, the poll-loop shuts down, and removes itself from the list of poll actions that need to be performed. + networkEventPoller.close(logger, this) - // Connections MUST be closed first, because we want to make sure that no RMI messages can be received - // when we close the RMI support objects (in which case, weird - but harmless - errors show up) - // IF CLOSED VIA RMI: this will wait for RMI timeouts if there are RMI in-progress. - if (sessionManager.enabled()) { - if (closeEverything) { - // only close out RMI if we are using session management AND we are closing everything! - responseManager.close(logger) - } - } else { - // no session management, so always clear this out. - responseManager.close(logger) - } + // Connections MUST be closed first, because we want to make sure that no RMI messages can be received + // when we close the RMI support objects (in which case, weird - but harmless - errors show up) + // IF CLOSED VIA RMI: this will wait for RMI timeouts if there are RMI in-progress. + if (closeEverything) { + // only close out RMI if we are closing everything! + responseManager.close(logger) + } - // don't do these things if we are "closed" from a client connection disconnect - // if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done - if (closeEverything) { - // when the client connection is closed, we don't close the driver/etc. - // Clears out all registered events - listenerManager.close() + // don't do these things if we are "closed" from a client connection disconnect + // if there are any events going on, we want to schedule them to run AFTER all other events for this endpoint are done + if (closeEverything) { + // when the client connection is closed, we don't close the driver/etc. - // Remove from memory the data from the back-end storage - storage.close() - } + // Clears out all registered events + listenerManager.close() - // we might be restarting the aeron driver, so make sure it's closed. - aeronDriver.close() + // Remove from memory the data from the back-end storage + storage.close() + } - shutdown = true + // we might be restarting the aeron driver, so make sure it's closed. + aeronDriver.close() - // the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on - shutdownLatch.countDown() - shutdownInProgress.lazySet(false) + shutdown = true - if (releaseWaitingThreads) { - logger.trace("Counting down the close latch...") - closeLatch.countDown() - } + // the shutdown here must be in the launchSequentially lambda, this way we can guarantee the driver is closed before we move on + shutdownLatch.countDown() + shutdownInProgress.lazySet(false) - logger.info("Done shutting down the endpoint.") + if (releaseWaitingThreads) { + logger.trace("Counting down the close latch...") + closeLatch.countDown() + } + + logger.info("Done shutting down the endpoint.") } /** diff --git a/src/dorkbox/network/connection/session/SessionManagerFull.kt b/src/dorkbox/network/connection/buffer/BufferManager.kt similarity index 64% rename from src/dorkbox/network/connection/session/SessionManagerFull.kt rename to src/dorkbox/network/connection/buffer/BufferManager.kt index 837ba2e4..b1c049c8 100644 --- a/src/dorkbox/network/connection/session/SessionManagerFull.kt +++ b/src/dorkbox/network/connection/buffer/BufferManager.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package dorkbox.network.connection.session +package dorkbox.network.connection.buffer import dorkbox.bytes.ByteArrayWrapper import dorkbox.collections.LockFreeHashMap @@ -30,24 +30,23 @@ import net.jodah.expiringmap.ExpiringMap import org.slf4j.LoggerFactory import java.util.concurrent.* -internal open class SessionManagerFull( +internal open class BufferManager( config: Configuration, listenerManager: ListenerManager, - val aeronDriver: AeronDriver, + aeronDriver: AeronDriver, sessionTimeout: Long -): SessionManager { +) { companion object { - private val logger = LoggerFactory.getLogger(SessionManagerFull::class.java.simpleName) + private val logger = LoggerFactory.getLogger(BufferManager::class.java.simpleName) } - - private val sessions = LockFreeHashMap>() - - - private val expiringSessions: ExpiringMap> + private val sessions = LockFreeHashMap() + private val expiringSessions: ExpiringMap init { + require(sessionTimeout >= 60) { "The buffered connection timeout 'bufferedConnectionTimeoutSeconds' must be greater than 60 seconds!" } + // ignore 0 val check = TimeUnit.SECONDS.toNanos(sessionTimeout) val lingerNs = aeronDriver.lingerNs() @@ -62,7 +61,7 @@ internal open class SessionManagerFull( expiringSessions = ExpiringMap.builder() .expiration(sessionTimeout, timeUnit) .expirationPolicy(ExpirationPolicy.CREATED) - .expirationListener> { publicKeyWrapped, sessionConnection -> + .expirationListener { publicKeyWrapped, sessionConnection -> // this blocks until it fully runs (which is ok. this is fast) logger.debug("Connection session expired for: ${publicKeyWrapped.bytes.toHexString()}") @@ -72,82 +71,60 @@ internal open class SessionManagerFull( .build() } - override fun enabled(): Boolean { - return true - } - /** * this must be called when a new connection is created * * @return true if this is a new session, false if it is an existing session */ - override fun onNewConnection(connection: Connection) { - require(connection is SessionConnection) { "The new connection does not inherit a SessionConnection, unable to continue. " } - + fun onConnect(connection: Connection): BufferedSession { val publicKeyWrapped = ByteArrayWrapper.wrap(connection.uuid) - synchronized(sessions) { + return synchronized(sessions) { // always check if we are expiring first... val expiring = expiringSessions.remove(publicKeyWrapped) if (expiring != null) { - // we must always set this session value!! - connection.session = expiring + expiring.connection = connection + expiring } else { val existing = sessions[publicKeyWrapped] if (existing != null) { // we must always set this session value!! - connection.session = existing + existing.connection = connection + existing } else { - @Suppress("UNCHECKED_CAST") - val newSession = (connection.endPoint as SessionEndpoint).newSession(connection as CONNECTION) + val newSession = BufferedSession(connection) + sessions[publicKeyWrapped] = newSession // we must always set this when the connection is created, and it must be inside the sync block! - connection.session = newSession - - sessions[publicKeyWrapped] = newSession + newSession } } } } - - /** - * this must be called when a new connection is created AND when the internal `reconnect` occurs (as a result of a network error) - * - * @return true if this is a new session, false if it is an existing session - */ - @Suppress("UNCHECKED_CAST") - override fun onInit(connection: Connection): Boolean { - // we know this will always be the case, because if this specific method can be called, then it will be a sessionConnection - connection as SessionConnection - - val session: Session = connection.session as Session - session.restore(connection as CONNECTION) - - // the FIRST time this method is called, it will be true. EVERY SUBSEQUENT TIME, it will be false - return session.isNewSession - } - - /** * Always called when a connection is disconnected from the network */ - override fun onDisconnect(connection: CONNECTION) { + fun onDisconnect(connection: Connection) { try { val publicKeyWrapped = ByteArrayWrapper.wrap(connection.uuid) - val session = synchronized(sessions) { + synchronized(sessions) { val sess = sessions.remove(publicKeyWrapped) // we want to expire this session after XYZ time expiringSessions[publicKeyWrapped] = sess - sess } - - - session!!.save(connection) } catch (e: Exception) { - logger.error("Unable to run save data for the session!", e) + logger.error("Unable to run session expire logic!", e) + } + } + + + fun close() { + synchronized(sessions) { + sessions.clear() + expiringSessions.clear() } } } diff --git a/src/dorkbox/network/connection/buffer/BufferedSession.kt b/src/dorkbox/network/connection/buffer/BufferedSession.kt new file mode 100644 index 00000000..d80f4cb1 --- /dev/null +++ b/src/dorkbox/network/connection/buffer/BufferedSession.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.connection.buffer + +import dorkbox.network.connection.Connection +import java.util.concurrent.* + +open class BufferedSession(@Volatile var connection: Connection) { + /** + * Only used when configured. Will re-send all missing messages to a connection when a connection re-connects. + */ + val pendingMessagesQueue: LinkedTransferQueue = LinkedTransferQueue() + + fun queueMessage(connection: Connection, message: Any, abortEarly: Boolean): Boolean { + if (this.connection != connection) { + connection.logger.trace("[{}] message received on old connection, resending", connection) + + // we received a message on an OLD connection (which is no longer connected ---- BUT we have a NEW connection that is connected) + // this can happen on RMI object that are old + val success = this.connection.send(message, abortEarly) + if (success) { + connection.logger.trace("[{}] successfully resent message", connection) + return true + } + } + + if (!abortEarly) { + // this was a "normal" send (instead of the disconnect message). + pendingMessagesQueue.put(message) + connection.logger.trace("[{}] queueing message", connection) + } + else if (connection.endPoint.aeronDriver.internal.mustRestartDriverOnError) { + // the only way we get errors, is if the connection is bad OR if we are sending so fast that the connection cannot keep up. + + // don't restart/reconnect -- there was an internal network error + pendingMessagesQueue.put(message) + connection.logger.trace("[{}] queueing message", connection) + } + else if (!connection.isClosedWithTimeout()) { + // there was an issue - the connection should automatically reconnect + pendingMessagesQueue.put(message) + connection.logger.trace("[{}] queueing message", connection) + } + + return false + } +} diff --git a/src/dorkbox/network/connection/session/package-info.java b/src/dorkbox/network/connection/buffer/package-info.java similarity index 93% rename from src/dorkbox/network/connection/session/package-info.java rename to src/dorkbox/network/connection/buffer/package-info.java index 547d29a8..a291b834 100644 --- a/src/dorkbox/network/connection/session/package-info.java +++ b/src/dorkbox/network/connection/buffer/package-info.java @@ -14,4 +14,4 @@ * limitations under the License. */ -package dorkbox.network.connection.session; +package dorkbox.network.connection.buffer; diff --git a/src/dorkbox/network/connection/session/Session.kt b/src/dorkbox/network/connection/session/Session.kt deleted file mode 100644 index f7facffc..00000000 --- a/src/dorkbox/network/connection/session/Session.kt +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.connection.session - -import dorkbox.network.rmi.RemoteObject -import kotlinx.atomicfu.locks.ReentrantLock -import kotlinx.atomicfu.locks.withLock -import java.util.concurrent.* - -open class Session(@Volatile var connection: CONNECTION) { - - - // the RMI objects are saved when the connection is removed, and restored BEFORE the connection is initialized, so there are no concerns - // regarding the collision of RMI IDs and objects - private val lock = ReentrantLock() - private var oldProxyObjects: List>? = null - private var oldProxyCallbacks: List Unit>>? = null - private var oldImplObjects: List>? = null - - /** - * Only used when configured. Will re-send all missing messages to a connection when a connection re-connects. - */ - val pendingMessagesQueue: LinkedTransferQueue = LinkedTransferQueue() - - - /** - * the FIRST time this method is called, it will be true. EVERY SUBSEQUENT TIME, it will be false - */ - @Volatile - internal var isNewSession = true - private set - get() { - val orig = field - if (orig) { - field = false - newSession = false - } - return orig - } - - - @Volatile - private var newSession = true - - - fun restore(connection: CONNECTION) { - this.connection = connection - - if (newSession) { - connection.logger.debug("[{}] Session connection established", connection) - return - } - - connection.logger.debug("[{}] Restoring session connection", connection) - - lock.withLock { - val oldProxyObjects = oldProxyObjects - val oldProxyCallbacks = oldProxyCallbacks - val oldImplObjects = oldImplObjects - - // this is called, even on a brand-new session, so we must have extra checks in place. - val rmi = connection.rmi - if (oldProxyObjects != null) { - rmi.recreateProxyObjects(oldProxyObjects) - this.oldProxyObjects = null - } - if (oldProxyCallbacks != null) { - rmi.restoreCallbacks(oldProxyCallbacks) - this.oldProxyCallbacks = null - } - if (oldImplObjects != null) { - rmi.restoreImplObjects(oldImplObjects) - this.oldImplObjects = null - } - } - } - - fun save(connection: CONNECTION) { - connection.logger.debug("[{}] Saving session connection", connection) - - val rmi = connection.rmi - val allProxyObjects = rmi.getAllProxyObjects() - val allProxyCallbacks = rmi.getAllCallbacks() - val allImplObjects = rmi.getAllImplObjects() - - // we want to save all the connection RMI objects, so they can be recreated on connect - lock.withLock { - oldProxyObjects = allProxyObjects - oldProxyCallbacks = allProxyCallbacks - oldImplObjects = allImplObjects - } - } - - fun queueMessage(connection: SessionConnection, message: Any, abortEarly: Boolean): Boolean { - if (this.connection != connection) { - connection.logger.trace("[{}] message received on old connection, resending", connection) - - // we received a message on an OLD connection (which is no longer connected ---- BUT we have a NEW connection that is connected) - // this can happen on RMI object that are old - val success = this.connection.send(message, abortEarly) - if (success) { - connection.logger.trace("[{}] successfully resent message", connection) - return true - } - } - - if (!abortEarly) { - // this was a "normal" send (instead of the disconnect message). - pendingMessagesQueue.put(message) - connection.logger.trace("[{}] queueing message", connection) - } - else if (connection.endPoint.aeronDriver.internal.mustRestartDriverOnError) { - // the only way we get errors, is if the connection is bad OR if we are sending so fast that the connection cannot keep up. - - // don't restart/reconnect -- there was an internal network error - pendingMessagesQueue.put(message) - connection.logger.trace("[{}] queueing message", connection) - } - else if (!connection.isClosedWithTimeout()) { - // there was an issue - the connection should automatically reconnect - pendingMessagesQueue.put(message) - connection.logger.trace("[{}] queueing message", connection) - } - - return false - } -} diff --git a/src/dorkbox/network/connection/session/SessionClient.kt b/src/dorkbox/network/connection/session/SessionClient.kt deleted file mode 100644 index beabfa97..00000000 --- a/src/dorkbox/network/connection/session/SessionClient.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.connection.session - -import dorkbox.network.Client -import dorkbox.network.ClientConfiguration -import dorkbox.network.connection.ConnectionParams - -open class SessionClient(config: ClientConfiguration = ClientConfiguration(), loggerName: String = Client::class.java.simpleName): - Client(config, loggerName), SessionEndpoint { - - override fun newConnection(connectionParameters: ConnectionParams): CONNECTION { - @Suppress("UNCHECKED_CAST") - return SessionConnection(connectionParameters) as CONNECTION - } - - override fun newSession(connection: CONNECTION): Session { - return Session(connection) - } -} diff --git a/src/dorkbox/network/connection/session/SessionConnection.kt b/src/dorkbox/network/connection/session/SessionConnection.kt deleted file mode 100644 index df7f7970..00000000 --- a/src/dorkbox/network/connection/session/SessionConnection.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.connection.session - -import dorkbox.network.connection.Connection -import dorkbox.network.connection.ConnectionParams - -open class SessionConnection(connectionParameters: ConnectionParams<*>): Connection(connectionParameters) { - @Volatile - lateinit var session: Session<*> - - override fun send(message: Any, abortEarly: Boolean): Boolean { - val success = super.send(message, abortEarly) - if (!success) { - return session.queueMessage(this, message, abortEarly) - } - - return true - } - - internal fun sendPendingMessages() { - // now send all pending messages - if (logger.isDebugEnabled) { - logger.debug("Sending pending messages: ${session.pendingMessagesQueue.size}") - } - session.pendingMessagesQueue.forEach { - super.send(it, false) - } - } -} diff --git a/src/dorkbox/network/connection/session/SessionEndpoint.kt b/src/dorkbox/network/connection/session/SessionEndpoint.kt deleted file mode 100644 index 378b7148..00000000 --- a/src/dorkbox/network/connection/session/SessionEndpoint.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.connection.session - -import dorkbox.network.connection.ConnectionParams - -interface SessionEndpoint { - fun newConnection(connectionParameters: ConnectionParams): CONNECTION - fun newSession(connection: CONNECTION): Session -} diff --git a/src/dorkbox/network/connection/session/SessionManager.kt b/src/dorkbox/network/connection/session/SessionManager.kt deleted file mode 100644 index 4b64018b..00000000 --- a/src/dorkbox/network/connection/session/SessionManager.kt +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.connection.session - -import dorkbox.network.connection.Connection - -interface SessionManager { - - fun enabled(): Boolean - fun onNewConnection(connection: Connection) - fun onInit(connection: Connection): Boolean - fun onDisconnect(connection: CONNECTION) -} diff --git a/src/dorkbox/network/connection/session/SessionManagerNoOp.kt b/src/dorkbox/network/connection/session/SessionManagerNoOp.kt deleted file mode 100644 index 70ca77b8..00000000 --- a/src/dorkbox/network/connection/session/SessionManagerNoOp.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.connection.session - -import dorkbox.network.connection.Connection - -class SessionManagerNoOp: SessionManager { - override fun enabled(): Boolean { - return false - } - - override fun onNewConnection(connection: Connection) { - // do nothing - } - - override fun onInit(connection: Connection): Boolean { - // do nothing - return true - } - - override fun onDisconnect(connection: CONNECTION) { - // do nothing - } -} diff --git a/src/dorkbox/network/connection/session/SessionServer.kt b/src/dorkbox/network/connection/session/SessionServer.kt deleted file mode 100644 index 028a16f9..00000000 --- a/src/dorkbox/network/connection/session/SessionServer.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkbox.network.connection.session - -import dorkbox.network.Server -import dorkbox.network.ServerConfiguration -import dorkbox.network.connection.ConnectionParams - -open class SessionServer(config: ServerConfiguration = ServerConfiguration(), loggerName: String = Server::class.java.simpleName): - Server(config, loggerName), SessionEndpoint { - override fun newConnection(connectionParameters: ConnectionParams): CONNECTION { - @Suppress("UNCHECKED_CAST") - return SessionConnection(connectionParameters) as CONNECTION - } - - override fun newSession(connection: CONNECTION): Session { - return Session(connection) - } -} diff --git a/src/dorkbox/network/rmi/RmiClient.kt b/src/dorkbox/network/rmi/RmiClient.kt index 44f5b2a0..c68808f9 100644 --- a/src/dorkbox/network/rmi/RmiClient.kt +++ b/src/dorkbox/network/rmi/RmiClient.kt @@ -274,9 +274,7 @@ internal class RmiClient(val isGlobal: Boolean, val success = connection.send(invokeMethod) if (!success) { - if (!connection.endPoint.sessionManager.enabled()) { - throw RmiException("Unable to send async message, an error occurred during the send process") - } + throw RmiException("Unable to send async message, an error occurred during the send process") } // if we are async then we return immediately (but must return the correct type!) @@ -310,10 +308,8 @@ internal class RmiClient(val isGlobal: Boolean, val success = connection.send(invokeMethod) if (!success) { - if (!connection.endPoint.sessionManager.enabled()) { - responseManager.abort(responseWaiter, logger) - throw RmiException("Unable to send message, an error occurred during the send process") - } + responseManager.abort(responseWaiter, logger) + throw RmiException("Unable to send message, an error occurred during the send process") } diff --git a/src/module-info.java b/src/module-info.java index 806b4680..0420371c 100644 --- a/src/module-info.java +++ b/src/module-info.java @@ -3,7 +3,7 @@ module dorkbox.network { exports dorkbox.network.aeron; exports dorkbox.network.connection; exports dorkbox.network.connection.streaming; - exports dorkbox.network.connection.session; + exports dorkbox.network.connection.buffer; exports dorkbox.network.connectionType; exports dorkbox.network.exceptions; exports dorkbox.network.handshake; diff --git a/test/dorkboxTest/network/SessionReconnectTest.kt b/test/dorkboxTest/network/SessionReconnectTest.kt deleted file mode 100644 index 98280ff9..00000000 --- a/test/dorkboxTest/network/SessionReconnectTest.kt +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dorkboxTest.network - -import dorkbox.network.Client -import dorkbox.network.Server -import dorkbox.network.connection.Connection -import dorkbox.network.connection.session.SessionClient -import dorkbox.network.connection.session.SessionConnection -import dorkbox.network.connection.session.SessionServer -import dorkbox.network.rmi.RemoteObject -import dorkboxTest.network.rmi.cows.TestCow -import dorkboxTest.network.rmi.cows.TestCowImpl -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.junit.Assert -import org.junit.Test -import java.util.concurrent.* - - -class MessageToContinue - -class SessionReconnectTest: BaseTest() { - @Test - fun rmiReconnectSessions() { - val server = run { - val configuration = serverConfig() - - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - configuration.serialization.register(MessageToContinue::class.java) - configuration.serialization.register(UnsupportedOperationException::class.java) - - // for Client -> Server RMI - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - - val server = SessionServer(configuration) - - addEndPoint(server) - - server.onMessage { m -> - send(MessageToContinue()) - } - - server - } - - val client = run { - val configuration = clientConfig() - - - val client = SessionClient(configuration) - - addEndPoint(client) - - var rmiId = 0 - - client.onConnect { - logger.error("Connecting") - - rmi.create(23) { - rmiId = it - moo("Client -> Server") - - this@onConnect.send(MessageToContinue()) - } - } - - client.onMessage { _ -> - val get = rmi.get(rmiId) - RemoteObject.cast(get).responseTimeout = 50_000 - - get.moo("NOT CRASHED!") - - val latch = CountDownLatch(1) - - GlobalScope.launch { - latch.await() - get.moo("DELAYED AND NOT CRASHED!") - stopEndPoints() - } - - client.close(false) - client.waitForClose() - client.connectIpc() // reconnect - latch.countDown() - } - - client - } - - server.bindIpc() - client.connectIpc() - - waitForThreads() - } - - @Test - fun rmiReconnectReplayMessages() { - val server = run { - val configuration = serverConfig() - - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - configuration.serialization.register(MessageToContinue::class.java) - configuration.serialization.register(UnsupportedOperationException::class.java) - - // for Client -> Server RMI - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - - val server = SessionServer(configuration) - - addEndPoint(server) - - server.onMessage { m -> - send(MessageToContinue()) - } - - server - } - - val client = run { - val configuration = clientConfig() - - - val client = SessionClient(configuration) - - addEndPoint(client) - - var rmiId = 0 - - client.onConnect { - logger.error("Connecting") - - rmi.create(23) { - rmiId = it - moo("Client -> Server") - - this@onConnect.send(MessageToContinue()) - } - } - - client.onMessage { _ -> - logger.error("Starting reconnect bits") - val obj = rmi.get(rmiId) - - // closing client - client.close(false) - client.waitForClose() - - logger.error("Getting object again. it should be the cached version") - val cast = RemoteObject.cast(obj) - cast.responseTimeout = 50_000 - - GlobalScope.launch { - delay(1000) // must be shorter than the RMI timeout for our tests - - client.connectIpc() - - delay(1000) // give some time for the messages to go! If we close instantly, the return RMI message will fail - stopEndPoints() - } - - // this is SYNC, so it waits for a response! - try { - cast.async = false - obj.moo("DELAYED AND NOT CRASHED!") // will wait for a response - } - catch (e: Exception) { - e.printStackTrace() - e.cause?.printStackTrace() - Assert.fail(".moo() should not throw an exception, because it will succeed before the timeout") - } - } - - client - } - - server.bindIpc() - client.connectIpc() - - waitForThreads() - } - - @Test - fun rmiReconnectSessionsFail() { - var rmiId = 0 - - val server = run { - val configuration = serverConfig() - - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - configuration.serialization.register(MessageToContinue::class.java) - configuration.serialization.register(UnsupportedOperationException::class.java) - - // for Client -> Server RMI - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - - val server = SessionServer(configuration) - - addEndPoint(server) - - server.onMessage { m -> - rmi.delete(rmiId) - - // NOTE: if we send an RMI object, it will automatically be saved! - send(MessageToContinue()) - } - - server - } - - val client = run { - val configuration = clientConfig() - - val client = SessionClient(configuration) - - addEndPoint(client) - - var firstTime = true - - client.onConnect { - if (firstTime) { - firstTime = false - logger.error("Connecting") - - rmi.create(23) { - rmiId = it - moo("Client -> Server") - this@onConnect.send(MessageToContinue()) - } - } - } - - client.onMessage { _ -> - val obj = rmi.get(rmiId) - val o2 = RemoteObject.cast(obj) - - GlobalScope.launch { - delay(4000) - - try { - o2.sync { - obj.moo("DELAYED AND NOT CRASHED!") - Assert.fail(".moo() should throw an timeout exception, the backing RMI object doesn't exist!") - } - } - catch (ignored: Exception) { - } - - stopEndPoints() - } - - client.close(false) - client.connectIpc() - } - - client - } - - server.bindIpc() - client.connectIpc() - - waitForThreads() - } - - @Test - fun rmiReconnectSessionsFail2() { - - var rmiId = 0 - - val server = run { - val configuration = serverConfig() - - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - configuration.serialization.register(MessageToContinue::class.java) - configuration.serialization.register(UnsupportedOperationException::class.java) - - // for Client -> Server RMI - configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java) - - val server = Server(configuration) - - addEndPoint(server) - - server.onMessage { m -> - rmi.delete(rmiId) - - // NOTE: if we send an RMI object, it will automatically be saved! - send(MessageToContinue()) - } - - server - } - - val client = run { - val configuration = clientConfig() - - - val client = Client(configuration) - - addEndPoint(client) - - var firstTime = true - - client.onConnect { - if (firstTime) { - firstTime = false - logger.error("Connecting") - - rmi.create(23) { - rmiId = it - moo("Client -> Server") - - this@onConnect.send(MessageToContinue()) - } - } - } - - client.onMessage { _ -> - val obj = rmi.get(rmiId) - val o2 = RemoteObject.cast(obj) - o2.responseTimeout = 50_000 - - GlobalScope.launch { - delay(4000) - - try { - o2.sync { - obj.moo("CRASHED!") - Assert.fail(".moo() should throw an timeout exception, the backing RMI object doesn't exist!") - } - } - catch (e: Exception) { - logger.error("Successfully caught error!") - } - - stopEndPoints() - } - - client.close(false) - client.connectIpc() - } - - client - } - - server.bindIpc() - client.connectIpc() - - waitForThreads() - } -} diff --git a/test/dorkboxTest/network/app/AeronClientServerSessionForever.kt b/test/dorkboxTest/network/app/AeronClientServerSessionForever.kt deleted file mode 100644 index 02d7606f..00000000 --- a/test/dorkboxTest/network/app/AeronClientServerSessionForever.kt +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Copyright 2023 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkboxTest.network.app - -import ch.qos.logback.classic.Level -import ch.qos.logback.classic.Logger -import ch.qos.logback.classic.encoder.PatternLayoutEncoder -import ch.qos.logback.classic.joran.JoranConfigurator -import ch.qos.logback.classic.spi.ILoggingEvent -import ch.qos.logback.core.ConsoleAppender -import dorkbox.netUtil.IPv4 -import dorkbox.network.ClientConfiguration -import dorkbox.network.ServerConfiguration -import dorkbox.network.connection.session.SessionClient -import dorkbox.network.connection.session.SessionConnection -import dorkbox.network.connection.session.SessionServer -import dorkbox.network.ipFilter.IpSubnetFilterRule -import dorkbox.storage.Storage -import dorkbox.util.Sys -import kotlinx.atomicfu.atomic -import org.agrona.concurrent.SigInt -import org.slf4j.LoggerFactory -import sun.misc.Unsafe -import java.lang.reflect.Field - -/** - * THIS WILL RUN FOREVER. It is primarily used for profiling. - */ -class AeronClientServerSessionForever { - companion object { - init { - try { - val theUnsafe = Unsafe::class.java.getDeclaredField("theUnsafe") - theUnsafe.isAccessible = true - val u = theUnsafe.get(null) as Unsafe - val cls = Class.forName("jdk.internal.module.IllegalAccessLogger") - val logger: Field = cls.getDeclaredField("logger") - u.putObjectVolatile(cls, u.staticFieldOffset(logger), null) - } catch (e: NoSuchFieldException) { - e.printStackTrace() - } catch (e: IllegalAccessException) { - e.printStackTrace() - } catch (e: ClassNotFoundException) { - e.printStackTrace() - } - - // assume SLF4J is bound to logback in the current environment - val rootLogger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger - val context = rootLogger.loggerContext - val jc = JoranConfigurator() - jc.context = context - context.reset() // override default configuration - -// rootLogger.setLevel(Level.OFF); - - // rootLogger.setLevel(Level.INFO); -// rootLogger.level = Level.DEBUG - rootLogger.level = Level.TRACE -// rootLogger.setLevel(Level.ALL); - - - // we only want error messages - val nettyLogger = LoggerFactory.getLogger("io.netty") as Logger - nettyLogger.level = Level.ERROR - - // we only want error messages - val kryoLogger = LoggerFactory.getLogger("com.esotericsoftware") as Logger - kryoLogger.level = Level.ERROR - - - val encoder = PatternLayoutEncoder() - encoder.context = context - encoder.pattern = "%date{HH:mm:ss.SSS} %-5level [%logger{35}] %msg%n" - encoder.start() - - val consoleAppender = ConsoleAppender() - consoleAppender.context = context - consoleAppender.encoder = encoder - consoleAppender.start() - - rootLogger.addAppender(consoleAppender) - } - - /** - * Command-line entry point. - * - * @param args Command-line arguments - * - * @throws Exception On any error - */ - @Throws(Exception::class) - @JvmStatic - fun main(args: Array) { - val acs = AeronClientServerSessionForever() - - if (args.contains("server")) { - val server = acs.server() - server.waitForClose() - } else { - val client = acs.client("172.31.70.86") - client.waitForClose() - } - } - } - - - fun client(remoteAddress: String): SessionClient { - val count = atomic(0L) - val time = Stopwatch.createUnstarted() - - val configuration = ClientConfiguration() - configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! - configuration.appId = "aeron_test" - - configuration.enableIpc = false -// configuration.enableIPv4 = false - configuration.enableIPv6 = false - -// configuration.uniqueAeronDirectory = true - - val client = SessionClient(configuration) - - client.onInit { - logger.error("initialized") - } - - client.onConnect { - logger.error("connected") - time.start() - send(NoGarbageObj()) - } - - client.onDisconnect { - logger.error("disconnect") - } - - client.onError { throwable -> - logger.error("has error") - throwable.printStackTrace() - } - - client.onMessage { message -> - val andIncrement = count.getAndIncrement() - if ((andIncrement % 10) == 0L) { - logger.error("Sending messages: $andIncrement") - } - if (andIncrement > 0 && (andIncrement % 500000) == 0L) { - // we are measuring roundtrip performance - logger.error("For 1,000,000 messages: ${Sys.getTimePrettyFull(time.elapsedNanos())}") - time.reset() - time.start() - } - - val success = send(message) - logger.error("SUCCESS:: $success") - } - - client.connect(remoteAddress, 2000) // UDP connection via loopback - - return client - - // different ones needed - // send - reliable - // send - unreliable - // send - priority (0-255 -- 255 is MAX priority) when sending, max is always sent immediately, then lower priority is sent if there is no backpressure from the MediaDriver. - // send - IPC/local -// runBlocking { -// while (!client.isShutdown()) { -// client.send("ECHO " + java.lang.Long.toUnsignedString(client.crypto.secureRandom.nextLong(), 16)) -// } -// } - - - // connection needs to know - // is UDP or IPC - // host address - - // RMI - // client.get(5) -> gets from the server connection, if exists, then global. - // on server, a connection local RMI object "uses" an id for global, so there will never be a conflict - // using some tricks, we can make it so that it DOESN'T matter the order in which objects are created, - // and can specify, if we want, the object created. - // Once created though, as NEW ONE with the same ID cannot be created until the old one is removed! - -// Thread.sleep(2000L) -// client.close() - } - - - fun server(): SessionServer { - val configuration = ServerConfiguration() - configuration.settingsStore = Storage.Memory() // don't want to persist anything on disk! - configuration.listenIpAddress = "*" - configuration.maxClientCount = 50 - configuration.appId = "aeron_test" - - configuration.enableIpc = false -// configuration.enableIPv4 = false - configuration.enableIPv6 = false - - configuration.maxConnectionsPerIpAddress = 50 - configuration.serialization.register(NoGarbageObj::class.java, NGOSerializer()) - - val server = SessionServer(configuration) - - // we must always make sure that aeron is shut-down before starting again. - if (!server.ensureStopped()) { - throw IllegalStateException("Aeron was unable to shut down in a timely manner.") - } - - server.filter(IpSubnetFilterRule(IPv4.LOCALHOST, 32)) - - server.filter { - println("should the connection $this be allowed?") - true - } - - server.onInit { - logger.error("initialized") - } - - server.onConnect { - logger.error("connected: $this") - } - - server.onDisconnect { - logger.error("disconnect: $this") - } - - server.onErrorGlobal { throwable -> - server.logger.error("from test: has error") - throwable.printStackTrace() - } - - server.onError { throwable -> - logger.error("from test: has connection error: $this") - throwable.printStackTrace() - } - - server.onMessage { message -> - // bounce back message - val success = send(message) - logger.error("SUCCESS:: $success") - } - - var closeCalled = false - SigInt.register { - // only close once - if (!closeCalled) { - closeCalled = true - server.logger.info("Shutting down via sig-int command") - server.close() - } - } - - server.bind(2000) - - - return server - } -}