Fixed issues with client/server onDisconnect messages. Client/server.close() will now wait for all onDisconnect() callbacks to finish before returning from the close() method

This commit is contained in:
nathan 2020-08-27 13:57:30 +02:00
parent 46bee0b977
commit 39a77c4e65
4 changed files with 70 additions and 69 deletions

View File

@ -433,12 +433,22 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
isConnected = false isConnected = false
super.close() super.close()
// in the client, "notifyDisconnect" will NEVER be called, because it's only called on a connection! // in the client, "client-notifyDisconnect" will NEVER be called, because it's only called on a connection!
// (meaning, 'connection-notifiyDisconnect' is what is called)
// manually call it. // manually call it.
if (con != null) { if (con != null) {
runBlocking { // this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
val job = actionDispatch.launch {
listenerManager.notifyDisconnect(con) listenerManager.notifyDisconnect(con)
} }
// when we close a client or a server, we want to make sure that ALL notifications are finished.
// when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown
// NOTE: this must be the LAST thing happening!
runBlocking {
job.join()
}
} }
} }

View File

@ -30,7 +30,9 @@ import dorkbox.network.rmi.TimeoutException
import io.aeron.FragmentAssembler import io.aeron.FragmentAssembler
import io.aeron.Image import io.aeron.Image
import io.aeron.logbuffer.Header import io.aeron.logbuffer.Header
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
@ -265,6 +267,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
false false
} }
}, { connectionToClean -> }, { connectionToClean ->
logger.info {"[${connectionToClean.sessionId}] cleaned-up connection"}
// have to free up resources! // have to free up resources!
handshake.cleanup(connectionToClean) handshake.cleanup(connectionToClean)
@ -274,6 +278,11 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
// the compareAndSet is used to make sure that if we call close() MANUALLY, when the auto-cleanup/disconnect is called -- it doesn't // the compareAndSet is used to make sure that if we call close() MANUALLY, when the auto-cleanup/disconnect is called -- it doesn't
// try to do it again. // try to do it again.
connectionToClean.close() connectionToClean.close()
// have to manually notify the server-listenerManager that this connection was closed
// if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is
// instantly notified and on cleanup, the server-listenermanager is called
listenerManager.notifyDisconnect(connectionToClean)
}) })
@ -296,7 +305,6 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
} }
} }
/** /**
* Adds an IP+subnet rule that defines what type of connection this IP+subnet should have. * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have.
* - NOTHING : Nothing happens to the in/out bytes * - NOTHING : Nothing happens to the in/out bytes
@ -316,68 +324,14 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
} }
/** /**
* Safely sends objects to a destination * Safely sends objects to a destination.
*/ */
suspend fun send(message: Any) { suspend fun send(message: Any) {
connections.send(message) connections.forEach {
it.send(message)
}
} }
/**
* When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener,
* and ALL connections are notified of that listener.
* <br></br>
* It is POSSIBLE to add a server-connection 'local' listener (via connection.addListener), meaning that ONLY
* that listener attached to the connection is notified on that event (ie, admin type listeners)
*
* @return a newly created listener manager for the connection
*/
// fun addListenerManager(connection: C): ConnectionManager<C> {
// return connectionManager.addListenerManager(connection)
// }
/**
* When called by a server, NORMALLY listeners are added at the GLOBAL level (meaning, I add one listener,
* and ALL connections are notified of that listener.
* <br></br>
* It is POSSIBLE to remove a server-connection 'local' listener (via connection.removeListener), meaning that ONLY
* that listener attached to the connection is removed
*
*
* This removes the listener manager for that specific connection
*/
// fun removeListenerManager(connection: C) {
// connectionManager.removeListenerManager(connection)
// }
//
//
// /**
// * Creates a "global" remote object for use by multiple connections.
// *
// * @return the ID assigned to this RMI object
// */
// fun <T> create(objectId: Short, globalObject: T) {
// return rmiGlobalObjects.register(globalObject)
// }
//
// /**
// * Creates a "global" remote object for use by multiple connections.
// *
// * @return the ID assigned to this RMI object
// */
// fun <T> create(`object`: T): Short {
// return rmiGlobalObjects.register(`object`) ?: 0
// }
//
//
//
/** /**
* TODO: when adding a "custom" connection, it's super important to not have to worry about the sessionID (which is what we key off of) * TODO: when adding a "custom" connection, it's super important to not have to worry about the sessionID (which is what we key off of)
* Adds a custom connection to the server. * Adds a custom connection to the server.
@ -405,9 +359,50 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
connections.remove(connection) connections.remove(connection)
} }
/**
* Closes the server and all it's connections. After a close, you may call 'bind' again.
*/
override fun close() { override fun close() {
super.close() super.close()
bindAlreadyCalled = false bindAlreadyCalled = false
// when we call close, it will shutdown the polling mechanism, so we have to manually cleanup the connections and call server-notifyDisconnect
// on them
runBlocking {
val jobs = mutableListOf<Job>()
// we want to clear all the connections FIRST (since we are shutting down)
val cons = mutableListOf<CONNECTION>()
connections.forEach { cons.add(it) }
connections.clear()
cons.forEach { connection ->
logger.error("${connection.id} cleanup")
// have to free up resources!
handshake.cleanup(connection)
// make sure the connection is closed (close can only happen once, so a duplicate call does nothing!)
connection.close()
// have to manually notify the server-listenerManager that this connection was closed
// if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is
// instantly notified and on cleanup, the server-listenermanager is called
// NOTE: this must be the LAST thing happening!
// this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
val job = actionDispatch.launch {
listenerManager.notifyDisconnect(connection)
}
jobs.add(job)
}
// when we close a client or a server, we want to make sure that ALL notifications are finished.
// when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown
jobs.forEach { it.join() }
}
} }

View File

@ -93,12 +93,9 @@ internal open class ConnectionManager<CONNECTION: Connection>() {
} }
/** /**
* Safely sends objects to a destination (such as a custom object or a standard ping). This will automatically choose which protocol * Removes all connections. Does not call close or anything else on them
* is available to use.
*/ */
suspend inline fun send(message: Any) { fun clear() {
forEach { connections.clear()
it.send(message)
}
} }
} }

View File

@ -122,7 +122,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
private var aeron: Aeron? = null private var aeron: Aeron? = null
/** /**
* Returns the serialization wrapper if there is an object type that needs to be added outside of the basics. * Returns the serialization wrapper if there is an object type that needs to be added outside of the basic types.
*/ */
val serialization: Serialization val serialization: Serialization
@ -695,7 +695,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
rmiGlobalSupport.close() rmiGlobalSupport.close()
runBlocking { runBlocking {
// don't need anything fast or fancy here, because this method will only be called once
connections.forEach { connections.forEach {
it.close() it.close()
} }