driver endpoint list is now concurrent

This commit is contained in:
Robinson 2023-08-10 20:04:23 -06:00
parent 466363901c
commit ad9771263c
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 22 additions and 16 deletions

View File

@ -16,9 +16,11 @@
package dorkbox.network.aeron
import dorkbox.collections.ConcurrentIterator
import dorkbox.collections.LockFreeHashSet
import dorkbox.network.Configuration
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.EventDispatcher
import dorkbox.network.connection.ListenerManager
import dorkbox.network.connection.ListenerManager.Companion.cleanAllStackTrace
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
@ -98,7 +100,7 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
val driverId = config.mediaDriverId()
internal val endPointUsages = mutableListOf<EndPoint<*>>()
internal val endPointUsages = ConcurrentIterator<EndPoint<*>>()
@Volatile
private var aeron: Aeron? = null
@ -151,19 +153,16 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
logger.error { "Aeron Driver [$driverId]: Critical driver error!" }
// make a copy
val endpoints = endPointUsages.toTypedArray()
runBlocking {
endpoints.forEach {
it.connections.forEach {conn ->
conn.closeImmediately(false, false)
}
EventDispatcher.ERROR.launch {
endPointUsages.forEach {
it.close(closeEverything = false,
notifyDisconnect = false,
releaseWaitingThreads = false)
}
// closing the driver here will SEGFAULT the jvm!! (cannot have reentrant code on this thread)
}
if (error.message?.startsWith("ERROR - channel error - Network is unreachable") == true) {
val exception = AeronDriverException("Network is disconnected or unreachable.")
exception.cleanAllStackTrace()
@ -746,8 +745,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
return true
}
if (endPointUsages.size > 1 && !endPointUsages.contains(endPoint)) {
logger.debug { "Aeron Driver [$driverId]: still referenced by ${endPointUsages.size} endpoints" }
if (endPointUsages.size() > 1 && !endPointUsages.contains(endPoint)) {
logger.debug { "Aeron Driver [$driverId]: still referenced by ${endPointUsages.size()} endpoints" }
return true
}
@ -804,13 +803,18 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
endPointUsages.remove(endPoint)
}
logger.trace { "Aeron Driver [$driverId]: Requested close... (${endPointUsages.size} endpoints still in use)" }
logger.trace { "Aeron Driver [$driverId]: Requested close... (${endPointUsages.size()} endpoints still in use)" }
// ignore the extra driver checks, because in SOME situations, when trying to reconnect upon an error, the
if (isInUse(endPoint, logger)) {
if (!criticalDriverError) {
// driver gets into a bad state. When this happens, we have to ignore "are we already in use" checks, BECAUSE the driver is now corrupted and unusable!
if (criticalDriverError) {
// driver gets into a bad state. When this happens, we have to ignore "are we already in use" checks, BECAUSE the driver is now corrupted and unusable!
}
else {
logger.debug { "Aeron Driver [$driverId]: in use, not shutting down this instance." }
// reset our contextDefine value, so that this configuration can safely be reused
endPoint?.config?.contextDefined = false
return@withReentrantLock false
}
}
@ -818,6 +822,8 @@ internal class AeronDriverInternal(endPoint: EndPoint<*>?, config: Configuration
val removed = AeronDriver.driverConfigurations[driverId]
if (removed == null) {
logger.debug { "Aeron Driver [$driverId]: already closed. Ignoring close request." }
// reset our contextDefine value, so that this configuration can safely be reused
endPoint?.config?.contextDefined = false
return@withReentrantLock false
}