Added more log information when stopping the AeronMediaDriver. Code polish

This commit is contained in:
nathan 2020-09-23 17:09:34 +02:00
parent 93e406289c
commit c869306ebe
3 changed files with 14 additions and 62 deletions

View File

@ -32,6 +32,11 @@ object AeronConfig {
const val IPC_HANDSHAKE_STREAM_ID_PUB: Int = 0x1337c0de
const val IPC_HANDSHAKE_STREAM_ID_SUB: Int = 0x1337c0d3
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS
private fun create(config: Configuration, logger: KLogger): MediaDriver.Context {
/*
* Linux
@ -270,12 +275,16 @@ object AeronConfig {
*/
internal suspend fun stopDriver(mediaDriver: MediaDriver?, logger: KLogger = KotlinLogging.logger("AeronConfig")) {
if (mediaDriver == null) {
logger.debug { "No driver started for this instance. Not Stopping." }
return
}
val context = mediaDriver.context()
logger.debug("Stopping driver at '${context.aeronDirectory()}'...")
if (!isRunning(context)) {
// not running
logger.debug { "Driver is not running at '${context.aeronDirectory()}' for this context. Not Stopping." }
return
}
@ -284,21 +293,20 @@ object AeronConfig {
// on close, the publication CAN linger (in case a client goes away, and then comes back)
// AERON_PUBLICATION_LINGER_TIMEOUT, 5s by default (this can also be set as a URI param)
delay(5_000)
(context.sharedThreadFactory() as NamedThreadFactory).group.destroy()
delay(AERON_PUBLICATION_LINGER_TIMEOUT)
// wait for the media driver to actually stop
var count = 10
while (count-- >= 0 && isRunning(context)) {
logger.warn { "Aeron Media driver still running. Waiting for it to stop. Trying $count more times." }
logger.warn { "Aeron Media driver at '${context.aeronDirectory()}' is still running. Waiting for it to stop. Trying $count more times." }
delay(context.driverTimeoutMs())
}
} catch (e: Exception) {
logger.error("Error closing the media driver", e)
logger.error("Error closing the media driver at '${context.aeronDirectory()}'", e)
}
(context.sharedThreadFactory() as NamedThreadFactory).group.destroy()
logger.debug { "Closed the media driver at '${context.aeronDirectory()}'" }
}
}

View File

@ -1,28 +0,0 @@
package dorkbox.network.other
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import java.security.KeyFactory
import java.security.interfaces.XECPrivateKey
import java.security.spec.PKCS8EncodedKeySpec
/**
* Only public keys are ever sent across the wire.
*/
class XECPrivateKeySerializer : Serializer<XECPrivateKey>() {
private val keyFactory = KeyFactory.getInstance("EC")
override fun write(kryo: Kryo, output: Output, privateKey: XECPrivateKey) {
val encoded = privateKey.encoded
output.writeInt(encoded.size, true)
output.write(encoded)
}
override fun read(kryo: Kryo, input: Input, type: Class<out XECPrivateKey>): XECPrivateKey {
val length = input.readInt(true)
val encoded = ByteArray(length)
return keyFactory.generatePrivate(PKCS8EncodedKeySpec(encoded)) as XECPrivateKey
}
}

View File

@ -1,28 +0,0 @@
package dorkbox.network.other
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import java.security.KeyFactory
import java.security.interfaces.XECPublicKey
import java.security.spec.X509EncodedKeySpec
/**
* Only public keys are ever sent across the wire.
*/
class XECPublicKeySerializer : Serializer<XECPublicKey>() {
private val keyFactory = KeyFactory.getInstance("EC")
override fun write(kryo: Kryo, output: Output, publicKey: XECPublicKey) {
val encoded = publicKey.encoded
output.writeInt(encoded.size, true)
output.write(encoded)
}
override fun read(kryo: Kryo, input: Input, type: Class<out XECPublicKey>): XECPublicKey {
val length = input.readInt(true)
val encoded = ByteArray(length)
return keyFactory.generatePublic(X509EncodedKeySpec(encoded)) as XECPublicKey
}
}