diff --git a/src/dorkbox/network/connection/ListenerManager.kt b/src/dorkbox/network/connection/ListenerManager.kt index 94191745..8d8b4359 100644 --- a/src/dorkbox/network/connection/ListenerManager.kt +++ b/src/dorkbox/network/connection/ListenerManager.kt @@ -20,7 +20,6 @@ import dorkbox.network.ipFilter.IpFilterRule import dorkbox.os.OS import dorkbox.util.classes.ClassHelper import dorkbox.util.classes.ClassHierarchy -import kotlinx.atomicfu.atomic import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import mu.KLogger @@ -160,25 +159,32 @@ internal class ListenerManager(private val logger: KLogg } // initialize emtpy arrays - private val onConnectFilterList = atomic(Array Boolean)>(0) { { true } }) + @Volatile + private var onConnectFilterList = Array Boolean)>(0) { { true } } private val onConnectFilterMutex = Mutex() - private val onInitList = atomic(Array Unit)>(0) { { } }) + @Volatile + private var onInitList = Array Unit)>(0) { { } } private val onInitMutex = Mutex() - private val onConnectList = atomic(Array Unit)>(0) { { } }) + @Volatile + private var onConnectList = Array Unit)>(0) { { } } private val onConnectMutex = Mutex() - private val onDisconnectList = atomic(Array Unit>(0) { { } }) + @Volatile + private var onDisconnectList = Array Unit>(0) { { } } private val onDisconnectMutex = Mutex() - private val onErrorList = atomic(Array Unit>(0) { { } }) + @Volatile + private var onErrorList = Array Unit>(0) { { } } private val onErrorMutex = Mutex() - private val onErrorGlobalList = atomic(Array Unit>(0) { { } }) + @Volatile + private var onErrorGlobalList = Array Unit>(0) { { } } private val onErrorGlobalMutex = Mutex() - private val onMessageMap = atomic(IdentityMap, Array Unit>>(32, LOAD_FACTOR)) + @Volatile + private var onMessageMap = IdentityMap, Array Unit>>(32, LOAD_FACTOR) private val onMessageMutex = Mutex() // used to keep a cache of class hierarchy for distributing messages @@ -213,7 +219,7 @@ internal class ListenerManager(private val logger: KLogg suspend fun filter(function: suspend CONNECTION.() -> Boolean) { onConnectFilterMutex.withLock { // we have to follow the single-writer principle! - onConnectFilterList.lazySet(add(function, onConnectFilterList.value)) + onConnectFilterList = add(function, onConnectFilterList) } } @@ -226,7 +232,7 @@ internal class ListenerManager(private val logger: KLogg suspend fun onInit(function: suspend CONNECTION.() -> Unit) { onInitMutex.withLock { // we have to follow the single-writer principle! - onInitList.lazySet(add(function, onInitList.value)) + onInitList = add(function, onInitList) } } @@ -237,7 +243,7 @@ internal class ListenerManager(private val logger: KLogg suspend fun onConnect(function: suspend CONNECTION.() -> Unit) { onConnectMutex.withLock { // we have to follow the single-writer principle! - onConnectList.lazySet(add(function, onConnectList.value)) + onConnectList = add(function, onConnectList) } } @@ -249,7 +255,7 @@ internal class ListenerManager(private val logger: KLogg suspend fun onDisconnect(function: suspend CONNECTION.() -> Unit) { onDisconnectMutex.withLock { // we have to follow the single-writer principle! - onDisconnectList.lazySet(add(function, onDisconnectList.value)) + onDisconnectList = add(function, onDisconnectList) } } @@ -261,7 +267,7 @@ internal class ListenerManager(private val logger: KLogg suspend fun onError(function: suspend CONNECTION.(Throwable) -> Unit) { onErrorMutex.withLock { // we have to follow the single-writer principle! - onErrorList.lazySet(add(function, onErrorList.value)) + onErrorList = add(function, onErrorList) } } @@ -273,7 +279,7 @@ internal class ListenerManager(private val logger: KLogg suspend fun onError(function: suspend Throwable.() -> Unit) { onErrorGlobalMutex.withLock { // we have to follow the single-writer principle! - onErrorGlobalList.lazySet(add(function, onErrorGlobalList.value)) + onErrorGlobalList = add(function, onErrorGlobalList) } } @@ -307,7 +313,7 @@ internal class ListenerManager(private val logger: KLogg // NOTE: https://github.com/Kotlin/kotlinx.atomicfu // this is EXPLICITLY listed as a "Don't" via the documentation. The ****ONLY**** reason this is actually OK is because // we are following the "single-writer principle", so only ONE THREAD can modify this at a time. - val tempMap = onMessageMap.value + val tempMap = onMessageMap @Suppress("UNCHECKED_CAST") val func = function as suspend (CONNECTION, Any) -> Unit @@ -324,7 +330,7 @@ internal class ListenerManager(private val logger: KLogg } tempMap.put(messageClass, newMessageArray) - onMessageMap.lazySet(tempMap) + onMessageMap = tempMap } else { throw IllegalArgumentException("Unable to add incompatible types! Detected connection/message classes: $connectionClass, $messageClass") } @@ -350,10 +356,10 @@ internal class ListenerManager(private val logger: KLogg // by default, there is a SINGLE rule that will always exist, and will always ACCEPT ALL connections. // This is so the array types can be setup (the compiler needs SOMETHING there) - val arrayOfIpFilterRules = onConnectFilterList.value + val list = onConnectFilterList // if there is a rule, a connection must match for it to connect - arrayOfIpFilterRules.forEach { + list.forEach { if (it.invoke(connection)) { return true } @@ -362,7 +368,7 @@ internal class ListenerManager(private val logger: KLogg // default if nothing matches // NO RULES ADDED -> ACCEPT // RULES ADDED -> DENY - return arrayOfIpFilterRules.isEmpty() + return list.isEmpty() } /** @@ -372,7 +378,7 @@ internal class ListenerManager(private val logger: KLogg * Because of this guarantee, init is immediately executed where connect is on a separate thread */ suspend fun notifyInit(connection: CONNECTION) { - val list = onInitList.value + val list = onInitList list.forEach { try { it(connection) @@ -390,7 +396,7 @@ internal class ListenerManager(private val logger: KLogg * NOTE: This is run on the EventDispatch! */ fun notifyConnect(connection: CONNECTION) { - val list = onConnectList.value + val list = onConnectList if (list.isNotEmpty()) { EventDispatcher.CONNECT.launch { list.forEach { @@ -424,7 +430,7 @@ internal class ListenerManager(private val logger: KLogg * This is invoked by either a GLOBAL listener manager, or for a SPECIFIC CONNECTION listener manager. */ fun directNotifyDisconnect(connection: CONNECTION) { - val list = onDisconnectList.value + val list = onDisconnectList if (list.isNotEmpty()) { EventDispatcher.DISCONNECT.launch { list.forEach { @@ -449,7 +455,7 @@ internal class ListenerManager(private val logger: KLogg * NOTE: This is run on the EventDispatch! */ fun notifyError(connection: CONNECTION, exception: Throwable) { - val list = onErrorList.value + val list = onErrorList if (list.isNotEmpty()) { EventDispatcher.ERROR.launch { list.forEach { @@ -473,7 +479,7 @@ internal class ListenerManager(private val logger: KLogg * The error is also sent to an error log before notifying callbacks */ fun notifyError(exception: Throwable) { - val list = onErrorGlobalList.value + val list = onErrorGlobalList if (list.isNotEmpty()) { EventDispatcher.ERROR.launch { list.forEach { @@ -516,7 +522,7 @@ internal class ListenerManager(private val logger: KLogg // cache the lookup // we don't care about race conditions, since the object hierarchy will be ALREADY established at this exact moment - val tempMap = onMessageMap.value + val tempMap = onMessageMap var hasListeners = false hierarchy.forEach { clazz -> val onMessageArray: Array Unit>? = tempMap.get(clazz) @@ -544,25 +550,25 @@ internal class ListenerManager(private val logger: KLogg logger.debug { "Closing the listener manager" } onConnectFilterMutex.withLock { - onConnectFilterList.lazySet(Array(0) { { true } }) + onConnectFilterList = Array(0) { { true } } } onInitMutex.withLock { - onInitList.lazySet(Array(0) { { } }) + onInitList = Array(0) { { } } } onConnectMutex.withLock { - onConnectList.lazySet(Array(0) { { } }) + onConnectList = Array(0) { { } } } onDisconnectMutex.withLock { - onDisconnectList.lazySet(Array(0) { { } }) + onDisconnectList = Array(0) { { } } } onErrorMutex.withLock { - onErrorList.lazySet(Array(0) { { } }) + onErrorList = Array(0) { { } } } onErrorGlobalMutex.withLock { - onErrorGlobalList.lazySet(Array(0) { { } }) + onErrorGlobalList = Array(0) { { } } } onMessageMutex.withLock { - onMessageMap.lazySet(IdentityMap, Array Unit>>(32, LOAD_FACTOR)) + onMessageMap = IdentityMap, Array Unit>>(32, LOAD_FACTOR) } } }