OnError callbacks are now suspending

This commit is contained in:
Robinson 2023-06-29 12:12:29 +02:00
parent 15f93cf1b0
commit dec10ab4bc
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 55 additions and 49 deletions

View File

@ -188,7 +188,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger) internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger)
internal val rmiConnectionSupport: RmiManagerConnections<CONNECTION> internal val rmiConnectionSupport: RmiManagerConnections<CONNECTION>
private val streamingManager = StreamingManager<CONNECTION>(logger, messageDispatch) private val streamingManager = StreamingManager<CONNECTION>(logger, messageDispatch, config)
private val pingManager = PingManager<CONNECTION>() private val pingManager = PingManager<CONNECTION>()
@ -416,7 +416,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* *
* The error is also sent to an error log before this method is called. * The error is also sent to an error log before this method is called.
*/ */
fun onError(function: CONNECTION.(Throwable) -> Unit) { fun onError(function: suspend CONNECTION.(Throwable) -> Unit) {
runBlocking { runBlocking {
listenerManager.onError(function) listenerManager.onError(function)
} }
@ -427,7 +427,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* *
* The error is also sent to an error log before this method is called. * The error is also sent to an error log before this method is called.
*/ */
fun onErrorGlobal(function: (Throwable) -> Unit) { fun onErrorGlobal(function: suspend (Throwable) -> Unit) {
runBlocking { runBlocking {
listenerManager.onError(function) listenerManager.onError(function)
} }

View File

@ -160,7 +160,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
} }
// initialize emtpy arrays // initialize emtpy arrays
private val onConnectFilterList = atomic(Array<(CONNECTION.() -> Boolean)>(0) { { true } }) private val onConnectFilterList = atomic(Array<suspend (CONNECTION.() -> Boolean)>(0) { { true } })
private val onConnectFilterMutex = Mutex() private val onConnectFilterMutex = Mutex()
private val onInitList = atomic(Array<suspend (CONNECTION.() -> Unit)>(0) { { } }) private val onInitList = atomic(Array<suspend (CONNECTION.() -> Unit)>(0) { { } })
@ -172,10 +172,10 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
private val onDisconnectList = atomic(Array<suspend CONNECTION.() -> Unit>(0) { { } }) private val onDisconnectList = atomic(Array<suspend CONNECTION.() -> Unit>(0) { { } })
private val onDisconnectMutex = Mutex() private val onDisconnectMutex = Mutex()
private val onErrorList = atomic(Array<CONNECTION.(Throwable) -> Unit>(0) { { } }) private val onErrorList = atomic(Array<suspend CONNECTION.(Throwable) -> Unit>(0) { { } })
private val onErrorMutex = Mutex() private val onErrorMutex = Mutex()
private val onErrorGlobalList = atomic(Array<Throwable.() -> Unit>(0) { { } }) private val onErrorGlobalList = atomic(Array<suspend Throwable.() -> Unit>(0) { { } })
private val onErrorGlobalMutex = Mutex() private val onErrorGlobalMutex = Mutex()
private val onMessageMap = atomic(IdentityMap<Class<*>, Array<suspend CONNECTION.(Any) -> Unit>>(32, LOAD_FACTOR)) private val onMessageMap = atomic(IdentityMap<Class<*>, Array<suspend CONNECTION.(Any) -> Unit>>(32, LOAD_FACTOR))
@ -210,7 +210,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* *
* For a server, this function will be called for ALL clients. * For a server, this function will be called for ALL clients.
*/ */
suspend fun filter(function: CONNECTION.() -> Boolean) { suspend fun filter(function: suspend CONNECTION.() -> Boolean) {
onConnectFilterMutex.withLock { onConnectFilterMutex.withLock {
// we have to follow the single-writer principle! // we have to follow the single-writer principle!
onConnectFilterList.lazySet(add(function, onConnectFilterList.value)) onConnectFilterList.lazySet(add(function, onConnectFilterList.value))
@ -258,7 +258,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* *
* The error is also sent to an error log before this method is called. * The error is also sent to an error log before this method is called.
*/ */
suspend fun onError(function: CONNECTION.(Throwable) -> Unit) { suspend fun onError(function: suspend CONNECTION.(Throwable) -> Unit) {
onErrorMutex.withLock { onErrorMutex.withLock {
// we have to follow the single-writer principle! // we have to follow the single-writer principle!
onErrorList.lazySet(add(function, onErrorList.value)) onErrorList.lazySet(add(function, onErrorList.value))
@ -270,7 +270,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* *
* The error is also sent to an error log before this method is called. * The error is also sent to an error log before this method is called.
*/ */
suspend fun onError(function: Throwable.() -> Unit) { suspend fun onError(function: suspend Throwable.() -> Unit) {
onErrorGlobalMutex.withLock { onErrorGlobalMutex.withLock {
// we have to follow the single-writer principle! // we have to follow the single-writer principle!
onErrorGlobalList.lazySet(add(function, onErrorGlobalList.value)) onErrorGlobalList.lazySet(add(function, onErrorGlobalList.value))
@ -340,7 +340,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* *
* @return true if the connection will be allowed to connect. False if we should terminate this connection * @return true if the connection will be allowed to connect. False if we should terminate this connection
*/ */
fun notifyFilter(connection: CONNECTION): Boolean { suspend fun notifyFilter(connection: CONNECTION): Boolean {
// remote address will NOT be null at this stage, but best to verify. // remote address will NOT be null at this stage, but best to verify.
val remoteAddress = connection.remoteAddress val remoteAddress = connection.remoteAddress
if (remoteAddress == null) { if (remoteAddress == null) {
@ -389,20 +389,20 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* *
* NOTE: This is run on the EventDispatch! * NOTE: This is run on the EventDispatch!
*/ */
fun notifyConnect(connection: CONNECTION, onCompleteFunction: () -> Unit = {}) { fun notifyConnect(connection: CONNECTION) {
val list = onConnectList.value val list = onConnectList.value
EventDispatcher.CONNECT.launch { if (list.isNotEmpty()) {
list.forEach { EventDispatcher.CONNECT.launch {
try { list.forEach {
it(connection) try {
} catch (t: Throwable) { it(connection)
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace } catch (t: Throwable) {
t.cleanStackTrace() // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
logger.error("Connection ${connection.id} error", t) t.cleanStackTrace()
logger.error("Connection ${connection.id} error", t)
}
} }
} }
onCompleteFunction()
} }
} }
@ -425,14 +425,16 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
*/ */
fun directNotifyDisconnect(connection: CONNECTION) { fun directNotifyDisconnect(connection: CONNECTION) {
val list = onDisconnectList.value val list = onDisconnectList.value
EventDispatcher.DISCONNECT.launch { if (list.isNotEmpty()) {
list.forEach { EventDispatcher.DISCONNECT.launch {
try { list.forEach {
it(connection) try {
} catch (t: Throwable) { it(connection)
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace } catch (t: Throwable) {
t.cleanStackTrace() // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
logger.error("Connection ${connection.id} error", t) t.cleanStackTrace()
logger.error("Connection ${connection.id} error", t)
}
} }
} }
} }
@ -447,19 +449,21 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* NOTE: This is run on the EventDispatch! * NOTE: This is run on the EventDispatch!
*/ */
fun notifyError(connection: CONNECTION, exception: Throwable) { fun notifyError(connection: CONNECTION, exception: Throwable) {
logger.error("Error with connection $connection", exception)
val list = onErrorList.value val list = onErrorList.value
EventDispatcher.ERROR.launch { if (list.isNotEmpty()) {
list.forEach { EventDispatcher.ERROR.launch {
try { list.forEach {
it(connection, exception) try {
} catch (t: Throwable) { it(connection, exception)
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace } catch (t: Throwable) {
t.cleanStackTrace() // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
logger.error("Connection ${connection.id} error", t) t.cleanStackTrace()
logger.error("Connection ${connection.id} error", t)
}
} }
} }
} else {
logger.error("Error with connection $connection", exception)
} }
} }
@ -469,19 +473,21 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* The error is also sent to an error log before notifying callbacks * The error is also sent to an error log before notifying callbacks
*/ */
fun notifyError(exception: Throwable) { fun notifyError(exception: Throwable) {
logger.error("Global error", exception)
val list = onErrorGlobalList.value val list = onErrorGlobalList.value
EventDispatcher.ERROR.launch { if (list.isNotEmpty()) {
list.forEach { EventDispatcher.ERROR.launch {
try { list.forEach {
it(exception) try {
} catch (t: Throwable) { it(exception)
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace } catch (t: Throwable) {
t.cleanStackTrace() // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
logger.error("Global error", t) t.cleanStackTrace()
logger.error("Global error", t)
}
} }
} }
} else {
logger.error("Global error", exception)
} }
} }