Cleaned up how new aeron drivers are created

This commit is contained in:
Robinson 2023-08-09 22:11:25 -06:00
parent 96cd987238
commit 90d218637c
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
2 changed files with 77 additions and 47 deletions

View File

@ -70,10 +70,6 @@ fun ChannelUriStringBuilder.endpoint(isIpv4: Boolean, addressString: String, por
*/ */
class AeronDriver private constructor(config: Configuration, val logger: KLogger, val endPoint: EndPoint<*>?) { class AeronDriver private constructor(config: Configuration, val logger: KLogger, val endPoint: EndPoint<*>?) {
constructor(config: Configuration, logger: KLogger) : this(config, logger, null)
constructor(endPoint: EndPoint<*>) : this(endPoint.config, endPoint.logger, endPoint)
companion object { companion object {
/** /**
* Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW * Identifier for invalid sessions. This must be < RESERVED_SESSION_ID_LOW
@ -101,6 +97,21 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
// have to keep track of configurations and drivers, as we do not want to start the same media driver configuration multiple times (this causes problems!) // have to keep track of configurations and drivers, as we do not want to start the same media driver configuration multiple times (this causes problems!)
internal val driverConfigurations = IntMap<AeronDriverInternal>(4) internal val driverConfigurations = IntMap<AeronDriverInternal>(4)
fun new(endPoint: EndPoint<*>): AeronDriver {
var driver: AeronDriver? = null
runBlocking {
withLock {
driver = AeronDriver(endPoint.config, endPoint.logger, endPoint)
}
}
return driver!!
}
suspend fun withLock(action: suspend () -> Unit) {
lock.withReentrantLock(action)
}
/** /**
* Ensures that an endpoint (using the specified configuration) is NO LONGER running. * Ensures that an endpoint (using the specified configuration) is NO LONGER running.
@ -112,8 +123,11 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
return true return true
} }
val stopped = AeronDriver(configuration, logger, null).use { var stopped = false
it.ensureStopped(timeout, 500) withLock {
stopped = AeronDriver(configuration, logger, null).use {
it.ensureStopped(timeout, 500)
}
} }
// hacky, but necessary for multiple checks // hacky, but necessary for multiple checks
@ -145,8 +159,11 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
* @return true if the media driver is active and running * @return true if the media driver is active and running
*/ */
suspend fun isRunning(configuration: Configuration, logger: KLogger): Boolean { suspend fun isRunning(configuration: Configuration, logger: KLogger): Boolean {
val running = AeronDriver(configuration, logger).use { var running = false
it.isRunning() withLock {
running = AeronDriver(configuration, logger, null).use {
it.isRunning()
}
} }
return running return running
@ -468,35 +485,34 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
// assign the driver for this configuration. THIS IS GLOBAL for a JVM, because for a specific configuration, aeron only needs to be initialized ONCE. // assign the driver for this configuration. THIS IS GLOBAL for a JVM, because for a specific configuration, aeron only needs to be initialized ONCE.
// we have INSTANCE of the "wrapper" AeronDriver, because we want to be able to have references to the logger when doing things, // we have INSTANCE of the "wrapper" AeronDriver, because we want to be able to have references to the logger when doing things,
// however - the code that actually does stuff is a "singleton" in regard to an aeron configuration // however - the code that actually does stuff is a "singleton" in regard to an aeron configuration
internal = runBlocking { val driverId = mediaDriverConfig.mediaDriverId()
lock.withLock {
val driverId = mediaDriverConfig.id
var driver = driverConfigurations.get(driverId) logger.error { "Aeron Driver [$driverId]: Initializing..." }
if (driver == null) { val aeronDriver = driverConfigurations.get(driverId)
driver = AeronDriverInternal(endPoint, mediaDriverConfig, logger) if (aeronDriver == null) {
val driver = AeronDriverInternal(endPoint, mediaDriverConfig, logger)
driverConfigurations.put(driverId, driver) driverConfigurations.put(driverId, driver)
// register a logger so that we are notified when there is an error in Aeron // register a logger so that we are notified when there is an error in Aeron
driver.addError { driver.addError {
logger.error(this) { "Aeron Driver [$driverId]: error!" } logger.error(this) { "Aeron Driver [$driverId]: error!" }
}
if (logEverything) {
logger.debug { "Aeron Driver [$driverId]: Creating at '${driver.aeronDirectory}'" }
}
} else {
if (logEverything) {
logger.debug { "Aeron Driver [$driverId]: Reusing driver" }
}
// assign our endpoint to the driver
driver.addEndpoint(endPoint)
}
driver
} }
if (logEverything) {
logger.debug { "Aeron Driver [$driverId]: Creating at '${driver.aeronDirectory}'" }
}
internal = driver
} else {
if (logEverything) {
logger.debug { "Aeron Driver [$driverId]: Reusing driver" }
}
// assign our endpoint to the driver
aeronDriver.addEndpoint(endPoint)
internal = aeronDriver
} }
} }
@ -1055,4 +1071,25 @@ class AeronDriver private constructor(config: Configuration, val logger: KLogger
return false return false
} }
} }
suspend fun newIfClosed(): AeronDriver {
endPoint!!
var driver: AeronDriver? = null
withLock {
driver = if (closed()) {
// Only starts the media driver if we are NOT already running!
try {
AeronDriver(endPoint.config, endPoint.logger, endPoint)
} catch (e: Exception) {
throw endPoint.newException("Error initializing aeron driver", e)
}
} else {
this
}
}
return driver!!
}
} }

View File

@ -231,13 +231,13 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
} }
} }
aeronDriver = try {
try {
@Suppress("LeakingThis") @Suppress("LeakingThis")
aeronDriver = AeronDriver(this) AeronDriver.new(this@EndPoint)
} catch (e: Exception) { } catch (e: Exception) {
listenerManager.notifyError(Exception("Error initializing endpoint", e)) val exception = Exception("Error initializing endpoint", e)
throw e listenerManager.notifyError(exception)
throw exception
} }
rmiConnectionSupport = if (type.javaClass == Server::class.java) { rmiConnectionSupport = if (type.javaClass == Server::class.java) {
@ -334,15 +334,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
* @throws Exception if there is a problem starting the media driver * @throws Exception if there is a problem starting the media driver
*/ */
suspend fun startDriver() { suspend fun startDriver() {
if (aeronDriver.closed()) { // recreate the driver if we have previously closed. If we have never run, this does nothing
// Only starts the media driver if we are NOT already running! aeronDriver = aeronDriver.newIfClosed()
try {
aeronDriver = AeronDriver(this)
} catch (e: Exception) {
throw newException("Error initializing aeron driver", e)
}
}
aeronDriver.start() aeronDriver.start()
} }