ListenerManager now uses volatile arrays instead of atomic (still follows single-writer-principle)

This commit is contained in:
Robinson 2023-07-03 21:40:07 +02:00
parent 2b4ba1347e
commit b4a57c9525
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C

View File

@ -20,7 +20,6 @@ import dorkbox.network.ipFilter.IpFilterRule
import dorkbox.os.OS
import dorkbox.util.classes.ClassHelper
import dorkbox.util.classes.ClassHierarchy
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import mu.KLogger
@ -160,25 +159,32 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
}
// initialize emtpy arrays
private val onConnectFilterList = atomic(Array<suspend (CONNECTION.() -> Boolean)>(0) { { true } })
@Volatile
private var onConnectFilterList = Array<suspend (CONNECTION.() -> Boolean)>(0) { { true } }
private val onConnectFilterMutex = Mutex()
private val onInitList = atomic(Array<suspend (CONNECTION.() -> Unit)>(0) { { } })
@Volatile
private var onInitList = Array<suspend (CONNECTION.() -> Unit)>(0) { { } }
private val onInitMutex = Mutex()
private val onConnectList = atomic(Array<suspend (CONNECTION.() -> Unit)>(0) { { } })
@Volatile
private var onConnectList = Array<suspend (CONNECTION.() -> Unit)>(0) { { } }
private val onConnectMutex = Mutex()
private val onDisconnectList = atomic(Array<suspend CONNECTION.() -> Unit>(0) { { } })
@Volatile
private var onDisconnectList = Array<suspend CONNECTION.() -> Unit>(0) { { } }
private val onDisconnectMutex = Mutex()
private val onErrorList = atomic(Array<suspend CONNECTION.(Throwable) -> Unit>(0) { { } })
@Volatile
private var onErrorList = Array<suspend CONNECTION.(Throwable) -> Unit>(0) { { } }
private val onErrorMutex = Mutex()
private val onErrorGlobalList = atomic(Array<suspend Throwable.() -> Unit>(0) { { } })
@Volatile
private var onErrorGlobalList = Array<suspend Throwable.() -> Unit>(0) { { } }
private val onErrorGlobalMutex = Mutex()
private val onMessageMap = atomic(IdentityMap<Class<*>, Array<suspend CONNECTION.(Any) -> Unit>>(32, LOAD_FACTOR))
@Volatile
private var onMessageMap = IdentityMap<Class<*>, Array<suspend CONNECTION.(Any) -> Unit>>(32, LOAD_FACTOR)
private val onMessageMutex = Mutex()
// used to keep a cache of class hierarchy for distributing messages
@ -213,7 +219,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
suspend fun filter(function: suspend CONNECTION.() -> Boolean) {
onConnectFilterMutex.withLock {
// we have to follow the single-writer principle!
onConnectFilterList.lazySet(add(function, onConnectFilterList.value))
onConnectFilterList = add(function, onConnectFilterList)
}
}
@ -226,7 +232,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
suspend fun onInit(function: suspend CONNECTION.() -> Unit) {
onInitMutex.withLock {
// we have to follow the single-writer principle!
onInitList.lazySet(add(function, onInitList.value))
onInitList = add(function, onInitList)
}
}
@ -237,7 +243,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
suspend fun onConnect(function: suspend CONNECTION.() -> Unit) {
onConnectMutex.withLock {
// we have to follow the single-writer principle!
onConnectList.lazySet(add(function, onConnectList.value))
onConnectList = add(function, onConnectList)
}
}
@ -249,7 +255,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
suspend fun onDisconnect(function: suspend CONNECTION.() -> Unit) {
onDisconnectMutex.withLock {
// we have to follow the single-writer principle!
onDisconnectList.lazySet(add(function, onDisconnectList.value))
onDisconnectList = add(function, onDisconnectList)
}
}
@ -261,7 +267,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
suspend fun onError(function: suspend CONNECTION.(Throwable) -> Unit) {
onErrorMutex.withLock {
// we have to follow the single-writer principle!
onErrorList.lazySet(add(function, onErrorList.value))
onErrorList = add(function, onErrorList)
}
}
@ -273,7 +279,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
suspend fun onError(function: suspend Throwable.() -> Unit) {
onErrorGlobalMutex.withLock {
// we have to follow the single-writer principle!
onErrorGlobalList.lazySet(add(function, onErrorGlobalList.value))
onErrorGlobalList = add(function, onErrorGlobalList)
}
}
@ -307,7 +313,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
// NOTE: https://github.com/Kotlin/kotlinx.atomicfu
// this is EXPLICITLY listed as a "Don't" via the documentation. The ****ONLY**** reason this is actually OK is because
// we are following the "single-writer principle", so only ONE THREAD can modify this at a time.
val tempMap = onMessageMap.value
val tempMap = onMessageMap
@Suppress("UNCHECKED_CAST")
val func = function as suspend (CONNECTION, Any) -> Unit
@ -324,7 +330,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
}
tempMap.put(messageClass, newMessageArray)
onMessageMap.lazySet(tempMap)
onMessageMap = tempMap
} else {
throw IllegalArgumentException("Unable to add incompatible types! Detected connection/message classes: $connectionClass, $messageClass")
}
@ -350,10 +356,10 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
// by default, there is a SINGLE rule that will always exist, and will always ACCEPT ALL connections.
// This is so the array types can be setup (the compiler needs SOMETHING there)
val arrayOfIpFilterRules = onConnectFilterList.value
val list = onConnectFilterList
// if there is a rule, a connection must match for it to connect
arrayOfIpFilterRules.forEach {
list.forEach {
if (it.invoke(connection)) {
return true
}
@ -362,7 +368,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
// default if nothing matches
// NO RULES ADDED -> ACCEPT
// RULES ADDED -> DENY
return arrayOfIpFilterRules.isEmpty()
return list.isEmpty()
}
/**
@ -372,7 +378,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* Because of this guarantee, init is immediately executed where connect is on a separate thread
*/
suspend fun notifyInit(connection: CONNECTION) {
val list = onInitList.value
val list = onInitList
list.forEach {
try {
it(connection)
@ -390,7 +396,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* NOTE: This is run on the EventDispatch!
*/
fun notifyConnect(connection: CONNECTION) {
val list = onConnectList.value
val list = onConnectList
if (list.isNotEmpty()) {
EventDispatcher.CONNECT.launch {
list.forEach {
@ -424,7 +430,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* This is invoked by either a GLOBAL listener manager, or for a SPECIFIC CONNECTION listener manager.
*/
fun directNotifyDisconnect(connection: CONNECTION) {
val list = onDisconnectList.value
val list = onDisconnectList
if (list.isNotEmpty()) {
EventDispatcher.DISCONNECT.launch {
list.forEach {
@ -449,7 +455,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* NOTE: This is run on the EventDispatch!
*/
fun notifyError(connection: CONNECTION, exception: Throwable) {
val list = onErrorList.value
val list = onErrorList
if (list.isNotEmpty()) {
EventDispatcher.ERROR.launch {
list.forEach {
@ -473,7 +479,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
* The error is also sent to an error log before notifying callbacks
*/
fun notifyError(exception: Throwable) {
val list = onErrorGlobalList.value
val list = onErrorGlobalList
if (list.isNotEmpty()) {
EventDispatcher.ERROR.launch {
list.forEach {
@ -516,7 +522,7 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
// cache the lookup
// we don't care about race conditions, since the object hierarchy will be ALREADY established at this exact moment
val tempMap = onMessageMap.value
val tempMap = onMessageMap
var hasListeners = false
hierarchy.forEach { clazz ->
val onMessageArray: Array<suspend (CONNECTION, Any) -> Unit>? = tempMap.get(clazz)
@ -544,25 +550,25 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
logger.debug { "Closing the listener manager" }
onConnectFilterMutex.withLock {
onConnectFilterList.lazySet(Array(0) { { true } })
onConnectFilterList = Array(0) { { true } }
}
onInitMutex.withLock {
onInitList.lazySet(Array(0) { { } })
onInitList = Array(0) { { } }
}
onConnectMutex.withLock {
onConnectList.lazySet(Array(0) { { } })
onConnectList = Array(0) { { } }
}
onDisconnectMutex.withLock {
onDisconnectList.lazySet(Array(0) { { } })
onDisconnectList = Array(0) { { } }
}
onErrorMutex.withLock {
onErrorList.lazySet(Array(0) { { } })
onErrorList = Array(0) { { } }
}
onErrorGlobalMutex.withLock {
onErrorGlobalList.lazySet(Array(0) { { } })
onErrorGlobalList = Array(0) { { } }
}
onMessageMutex.withLock {
onMessageMap.lazySet(IdentityMap<Class<*>, Array<suspend CONNECTION.(Any) -> Unit>>(32, LOAD_FACTOR))
onMessageMap = IdentityMap<Class<*>, Array<suspend CONNECTION.(Any) -> Unit>>(32, LOAD_FACTOR)
}
}
}