diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 829caf99..720ea65d 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -188,7 +188,7 @@ abstract class EndPoint private constructor(val type: C internal val rmiGlobalSupport = RmiManagerGlobal(logger) internal val rmiConnectionSupport: RmiManagerConnections - private val streamingManager = StreamingManager(logger, messageDispatch) + private val streamingManager = StreamingManager(logger, messageDispatch, config) private val pingManager = PingManager() @@ -416,7 +416,7 @@ abstract class EndPoint private constructor(val type: C * * The error is also sent to an error log before this method is called. */ - fun onError(function: CONNECTION.(Throwable) -> Unit) { + fun onError(function: suspend CONNECTION.(Throwable) -> Unit) { runBlocking { listenerManager.onError(function) } @@ -427,7 +427,7 @@ abstract class EndPoint private constructor(val type: C * * The error is also sent to an error log before this method is called. */ - fun onErrorGlobal(function: (Throwable) -> Unit) { + fun onErrorGlobal(function: suspend (Throwable) -> Unit) { runBlocking { listenerManager.onError(function) } diff --git a/src/dorkbox/network/connection/ListenerManager.kt b/src/dorkbox/network/connection/ListenerManager.kt index 39a3694f..94191745 100644 --- a/src/dorkbox/network/connection/ListenerManager.kt +++ b/src/dorkbox/network/connection/ListenerManager.kt @@ -160,7 +160,7 @@ internal class ListenerManager(private val logger: KLogg } // initialize emtpy arrays - private val onConnectFilterList = atomic(Array<(CONNECTION.() -> Boolean)>(0) { { true } }) + private val onConnectFilterList = atomic(Array Boolean)>(0) { { true } }) private val onConnectFilterMutex = Mutex() private val onInitList = atomic(Array Unit)>(0) { { } }) @@ -172,10 +172,10 @@ internal class ListenerManager(private val logger: KLogg private val onDisconnectList = atomic(Array Unit>(0) { { } }) private val onDisconnectMutex = Mutex() - private val onErrorList = atomic(Array Unit>(0) { { } }) + private val onErrorList = atomic(Array Unit>(0) { { } }) private val onErrorMutex = Mutex() - private val onErrorGlobalList = atomic(Array Unit>(0) { { } }) + private val onErrorGlobalList = atomic(Array Unit>(0) { { } }) private val onErrorGlobalMutex = Mutex() private val onMessageMap = atomic(IdentityMap, Array Unit>>(32, LOAD_FACTOR)) @@ -210,7 +210,7 @@ internal class ListenerManager(private val logger: KLogg * * For a server, this function will be called for ALL clients. */ - suspend fun filter(function: CONNECTION.() -> Boolean) { + suspend fun filter(function: suspend CONNECTION.() -> Boolean) { onConnectFilterMutex.withLock { // we have to follow the single-writer principle! onConnectFilterList.lazySet(add(function, onConnectFilterList.value)) @@ -258,7 +258,7 @@ internal class ListenerManager(private val logger: KLogg * * The error is also sent to an error log before this method is called. */ - suspend fun onError(function: CONNECTION.(Throwable) -> Unit) { + suspend fun onError(function: suspend CONNECTION.(Throwable) -> Unit) { onErrorMutex.withLock { // we have to follow the single-writer principle! onErrorList.lazySet(add(function, onErrorList.value)) @@ -270,7 +270,7 @@ internal class ListenerManager(private val logger: KLogg * * The error is also sent to an error log before this method is called. */ - suspend fun onError(function: Throwable.() -> Unit) { + suspend fun onError(function: suspend Throwable.() -> Unit) { onErrorGlobalMutex.withLock { // we have to follow the single-writer principle! onErrorGlobalList.lazySet(add(function, onErrorGlobalList.value)) @@ -340,7 +340,7 @@ internal class ListenerManager(private val logger: KLogg * * @return true if the connection will be allowed to connect. False if we should terminate this connection */ - fun notifyFilter(connection: CONNECTION): Boolean { + suspend fun notifyFilter(connection: CONNECTION): Boolean { // remote address will NOT be null at this stage, but best to verify. val remoteAddress = connection.remoteAddress if (remoteAddress == null) { @@ -389,20 +389,20 @@ internal class ListenerManager(private val logger: KLogg * * NOTE: This is run on the EventDispatch! */ - fun notifyConnect(connection: CONNECTION, onCompleteFunction: () -> Unit = {}) { + fun notifyConnect(connection: CONNECTION) { val list = onConnectList.value - EventDispatcher.CONNECT.launch { - list.forEach { - try { - it(connection) - } catch (t: Throwable) { - // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace - t.cleanStackTrace() - logger.error("Connection ${connection.id} error", t) + if (list.isNotEmpty()) { + EventDispatcher.CONNECT.launch { + list.forEach { + try { + it(connection) + } catch (t: Throwable) { + // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace + t.cleanStackTrace() + logger.error("Connection ${connection.id} error", t) + } } } - - onCompleteFunction() } } @@ -425,14 +425,16 @@ internal class ListenerManager(private val logger: KLogg */ fun directNotifyDisconnect(connection: CONNECTION) { val list = onDisconnectList.value - EventDispatcher.DISCONNECT.launch { - list.forEach { - try { - it(connection) - } catch (t: Throwable) { - // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace - t.cleanStackTrace() - logger.error("Connection ${connection.id} error", t) + if (list.isNotEmpty()) { + EventDispatcher.DISCONNECT.launch { + list.forEach { + try { + it(connection) + } catch (t: Throwable) { + // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace + t.cleanStackTrace() + logger.error("Connection ${connection.id} error", t) + } } } } @@ -447,19 +449,21 @@ internal class ListenerManager(private val logger: KLogg * NOTE: This is run on the EventDispatch! */ fun notifyError(connection: CONNECTION, exception: Throwable) { - logger.error("Error with connection $connection", exception) - val list = onErrorList.value - EventDispatcher.ERROR.launch { - list.forEach { - try { - it(connection, exception) - } catch (t: Throwable) { - // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace - t.cleanStackTrace() - logger.error("Connection ${connection.id} error", t) + if (list.isNotEmpty()) { + EventDispatcher.ERROR.launch { + list.forEach { + try { + it(connection, exception) + } catch (t: Throwable) { + // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace + t.cleanStackTrace() + logger.error("Connection ${connection.id} error", t) + } } } + } else { + logger.error("Error with connection $connection", exception) } } @@ -469,19 +473,21 @@ internal class ListenerManager(private val logger: KLogg * The error is also sent to an error log before notifying callbacks */ fun notifyError(exception: Throwable) { - logger.error("Global error", exception) - val list = onErrorGlobalList.value - EventDispatcher.ERROR.launch { - list.forEach { - try { - it(exception) - } catch (t: Throwable) { - // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace - t.cleanStackTrace() - logger.error("Global error", t) + if (list.isNotEmpty()) { + EventDispatcher.ERROR.launch { + list.forEach { + try { + it(exception) + } catch (t: Throwable) { + // NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace + t.cleanStackTrace() + logger.error("Global error", t) + } } } + } else { + logger.error("Global error", exception) } }