Cleaned up sessions

This commit is contained in:
Robinson 2023-09-22 15:53:46 +02:00
parent c0227fee06
commit 8bbaa6df18
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
11 changed files with 71 additions and 101 deletions

View File

@ -791,8 +791,12 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
storage.addRegisteredServerKey(address!!, connectionInfo.publicKey)
}
connection0 = newConnection
addConnection(newConnection)
// in the specific case of using sessions, we don't want to call 'init' or `connect` for a connection that is resuming a session
var newSession = true
if (sessionManager.enabled()) {
newSession = sessionManager.onInit(newConnection as SessionConnection)
}
// tell the server our connection handshake is done, and the connection can now listen for data.
// also closes the handshake (will also throw connect timeout exception)
@ -816,17 +820,13 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
newConnection.setImage()
// in the specific case of using sessions, we don't want to call 'init' or `connect` for a connection that is resuming a session
var newSession = true
if (sessionManager.enabled()) {
newSession = sessionManager.onInit(newConnection as SessionConnection)
}
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
if (newSession) {
listenerManager.notifyInit(newConnection)
}
connection0 = newConnection
addConnection(newConnection)
// if we shutdown/close before the poller starts, we don't want to block forever
pollerClosedLatch = CountDownLatch(1)

View File

@ -16,6 +16,7 @@
package dorkbox.network.connection
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.aeron.AeronDriver.Companion.sessionIdAllocator
import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator
import dorkbox.network.connection.session.SessionConnection
@ -390,7 +391,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val connection = this
endPoint.ifServer {
if (endPoint.isServer()) {
// clean up the resources associated with this connection when it's closed
if (logger.isDebugEnabled) {
logger.debug("[${connection}] freeing resources")
@ -403,7 +404,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
if (remoteAddress != null) {
// unique for UDP endpoints
handshake.connectionsPerIpCounts.decrementSlow(remoteAddress)
(endPoint as Server).handshake.connectionsPerIpCounts.decrementSlow(remoteAddress)
}
}

View File

@ -299,16 +299,12 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
}
internal fun ifServer(function: Server<CONNECTION>.() -> Unit) {
if (type == Server::class.java) {
function(this as Server<CONNECTION>)
}
internal fun isServer(): Boolean {
return type === Server::class.java
}
internal fun ifClient(function: Client<CONNECTION>.() -> Unit) {
if (type == Client::class.java) {
function(this as Client<CONNECTION>)
}
internal fun isClient(): Boolean {
return type === Client::class.java
}
/**

View File

@ -16,17 +16,12 @@
package dorkbox.network.connection.session
import dorkbox.hex.toHexString
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.RemoteObject
import dorkbox.network.rmi.RmiClient
import kotlinx.atomicfu.locks.ReentrantLock
import kotlinx.atomicfu.locks.withLock
import java.lang.reflect.Proxy
import java.util.concurrent.*
class Session<CONNECTION: Connection>(val aeronDriver: AeronDriver) {
open class Session<CONNECTION: SessionConnection> {
// the RMI objects are saved when the connection is removed, and restored BEFORE the connection is initialized, so there are no concerns
// regarding the collision of RMI IDs and objects
private val lock = ReentrantLock()
@ -36,47 +31,10 @@ class Session<CONNECTION: Connection>(val aeronDriver: AeronDriver) {
/**
* Only used when configured. Will re-send all missing messages to a connection when a connection re-connects.
*/
internal val pendingMessagesQueue: LinkedTransferQueue<Any> = LinkedTransferQueue()
val pendingMessagesQueue: LinkedTransferQueue<Any> = LinkedTransferQueue()
@Volatile
private var connection: CONNECTION? = null
fun queueMessage(connection: SessionConnection, message: Any, abortEarly: Boolean): Boolean {
val existingConnection = this.connection
if (existingConnection != null && existingConnection != connection) {
// we received a message on an OLD connection (which is no longer connected ---- BUT we have a NEW connection that is connected)
val success = existingConnection.send(message, abortEarly)
if (success) {
return true
}
}
if (!abortEarly) {
// this was a "normal" send (instead of the disconnect message).
pendingMessagesQueue.put(message)
}
else if (aeronDriver.internal.mustRestartDriverOnError) {
// the only way we get errors, is if the connection is bad OR if we are sending so fast that the connection cannot keep up.
// don't restart/reconnect -- there was an internal network error
pendingMessagesQueue.put(message)
}
else if (existingConnection == null || !existingConnection.isConnected()) {
// there was an issue - the connection should automatically reconnect
pendingMessagesQueue.put(message)
}
// we couldn't send the message, but we did queue it if possible
return false
}
fun restore(connection: CONNECTION) {
connection.logger.error("RESTORING RMI OBJECTS for ${connection.uuid.toHexString()}")
this.connection = connection
lock.withLock {
// this is called, even on a brand-new session, so we must have extra checks in place.
val rmi = connection.rmi
@ -85,39 +43,37 @@ class Session<CONNECTION: Connection>(val aeronDriver: AeronDriver) {
oldProxyObjects = null
}
if (oldImplObjects != null) {
rmi.restoreAllImplObjects(oldImplObjects!!)
rmi.restoreImplObjects(oldImplObjects!!)
oldImplObjects = null
}
}
// now send all pending messages
connection.logger.error("Sending pending messages: ${pendingMessagesQueue.size}")
pendingMessagesQueue.forEach {
connection.send(it, false)
}
}
fun save(connection: CONNECTION) {
connection.logger.error("BACKING UP RMI OBJECTS: ${connection.uuid.toHexString()}")
val allProxyObjects = connection.rmi.getAllProxyObjects()
val allImplObjects = connection.rmi.getAllImplObjects()
allProxyObjects.forEach {
val rmiClient = Proxy.getInvocationHandler(it) as RmiClient
val rmiId = rmiClient.rmiObjectId
connection.logger.error("PROXY: $rmiId")
}
allImplObjects.forEach {
connection.logger.error("IMPL: ${it.first} : ${it.second.javaClass}")
}
// we want to save all the connection RMI objects, so they can be recreated on connect
lock.withLock {
oldProxyObjects = allProxyObjects
oldImplObjects = allImplObjects
}
}
fun queueMessage(connection: SessionConnection, message: Any, abortEarly: Boolean) {
if (!abortEarly) {
// this was a "normal" send (instead of the disconnect message).
pendingMessagesQueue.put(message)
}
else if (connection.endPoint.aeronDriver.internal.mustRestartDriverOnError) {
// the only way we get errors, is if the connection is bad OR if we are sending so fast that the connection cannot keep up.
// don't restart/reconnect -- there was an internal network error
pendingMessagesQueue.put(message)
}
else if (!connection.isConnected()) {
// there was an issue - the connection should automatically reconnect
pendingMessagesQueue.put(message)
}
}
}

View File

@ -14,16 +14,21 @@
* limitations under the License.
*/
package dorkbox.network
package dorkbox.network.connection.session
import dorkbox.network.Client
import dorkbox.network.ClientConfiguration
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.session.SessionConnection
class SessionClient<CONNECTION: SessionConnection>(config: ClientConfiguration = ClientConfiguration(), loggerName: String = Client::class.java.simpleName):
open class SessionClient<CONNECTION: SessionConnection>(config: ClientConfiguration = ClientConfiguration(), loggerName: String = Client::class.java.simpleName):
Client<CONNECTION>(config, loggerName) {
override fun newConnection(connectionParameters: ConnectionParams<CONNECTION>): CONNECTION {
@Suppress("UNCHECKED_CAST")
return SessionConnection(connectionParameters) as CONNECTION
}
open fun newSession(): Session<CONNECTION> {
return Session()
}
}

View File

@ -21,20 +21,24 @@ import dorkbox.network.connection.ConnectionParams
open class SessionConnection(connectionParameters: ConnectionParams<*>): Connection(connectionParameters) {
@Volatile
internal lateinit var manager: Session<*>
lateinit var session: Session<*>
override fun send(message: Any, abortEarly: Boolean): Boolean {
val success = super.send(message, abortEarly)
if (!success) {
return manager.queueMessage(this, message, abortEarly)
session.queueMessage(this, message, abortEarly)
}
return success
}
fun sendPendingMessages() {
manager.pendingMessagesQueue.forEach {
send(it, false)
// now send all pending messages
if (logger.isTraceEnabled) {
logger.trace("Sending pending messages: ${session.pendingMessagesQueue.size}")
}
session.pendingMessagesQueue.forEach {
super.send(it, false)
}
}
}

View File

@ -108,14 +108,21 @@ internal open class SessionManager<CONNECTION: SessionConnection>(config: Config
existing
} else {
isNewSession = true
val newSession = Session<CONNECTION>(aeronDriver)
@Suppress("UNCHECKED_CAST")
val newSession: Session<CONNECTION> = if (connection.endPoint.isServer()) {
(connection.endPoint as SessionServer).newSession() as Session<CONNECTION>
} else {
(connection.endPoint as SessionClient).newSession() as Session<CONNECTION>
}
sessions[publicKeyWrapped] = newSession
newSession
}
}
}
connection.manager = session
connection.session = session
session.restore(connection)
return isNewSession

View File

@ -14,15 +14,20 @@
* limitations under the License.
*/
package dorkbox.network
package dorkbox.network.connection.session
import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.session.SessionConnection
class SessionServer<CONNECTION: SessionConnection>(config: ServerConfiguration = ServerConfiguration(), loggerName: String = Server::class.java.simpleName):
open class SessionServer<CONNECTION: SessionConnection>(config: ServerConfiguration = ServerConfiguration(), loggerName: String = Server::class.java.simpleName):
Server<CONNECTION>(config, loggerName) {
override fun newConnection(connectionParameters: ConnectionParams<CONNECTION>): CONNECTION {
@Suppress("UNCHECKED_CAST")
return SessionConnection(connectionParameters) as CONNECTION
}
open fun newSession(): Session<CONNECTION> {
return Session()
}
}

View File

@ -76,10 +76,7 @@ open class RmiObjectCache(val logger: Logger) {
/**
* @return all the saved RMI implementation objects along with their RMI ID. This is used by session management in order to preserve RMI functionality.
*/
internal fun restoreAllImplObjects(implObjects: List<Pair<Int, Any>>) {
implObjects.forEach {
logger.error("RESTORED: ${it.first}")
}
internal fun restoreImplObjects(implObjects: List<Pair<Int, Any>>) {
this.implObjects.restoreAll(implObjects)
}
}

View File

@ -116,7 +116,6 @@ class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
iface
)
logger.error("RESTORED: $rmiId")
saveProxyObject(rmiId, proxyObject)
}
}

View File

@ -18,10 +18,10 @@ package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.SessionClient
import dorkbox.network.SessionServer
import dorkbox.network.connection.Connection
import dorkbox.network.connection.session.SessionClient
import dorkbox.network.connection.session.SessionConnection
import dorkbox.network.connection.session.SessionServer
import dorkbox.network.rmi.RemoteObject
import dorkboxTest.network.rmi.cows.MessageWithTestCow
import dorkboxTest.network.rmi.cows.TestCow