Updated API for unittests
This commit is contained in:
parent
50ab7fc72f
commit
b8a6f5436d
|
@ -68,7 +68,7 @@ fun ChannelUriStringBuilder.endpoint(isIpv4: Boolean, addressString: String, por
|
|||
/**
|
||||
* Class for managing the Aeron+Media drivers
|
||||
*/
|
||||
class AeronDriver private constructor(config: Configuration, val logger: KLogger, val endPoint: EndPoint<*>?) {
|
||||
class AeronDriver constructor(config: Configuration, val logger: KLogger, val endPoint: EndPoint<*>?) {
|
||||
|
||||
companion object {
|
||||
/**
|
||||
|
|
|
@ -46,7 +46,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
conf.enableIPv6 = false
|
||||
conf.uniqueAeronDirectory = true
|
||||
|
||||
val driver = AeronDriver(conf, log)
|
||||
val driver = AeronDriver(conf, log, null)
|
||||
driver.start()
|
||||
driver
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
conf.enableIPv6 = false
|
||||
conf.uniqueAeronDirectory = true
|
||||
|
||||
val driver = AeronDriver(conf, log)
|
||||
val driver = AeronDriver(conf, log, null)
|
||||
driver.start()
|
||||
|
||||
clientDrivers.add(driver)
|
||||
|
@ -131,7 +131,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
conf.enableIPv6 = false
|
||||
conf.uniqueAeronDirectory = true
|
||||
|
||||
val driver = AeronDriver(conf, log)
|
||||
val driver = AeronDriver(conf, log, null)
|
||||
driver.start()
|
||||
driver
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ class AeronPubSubTest : BaseTest() {
|
|||
conf.enableIPv6 = false
|
||||
conf.uniqueAeronDirectory = true
|
||||
|
||||
val driver = AeronDriver(conf, log)
|
||||
val driver = AeronDriver(conf, log, null)
|
||||
driver.start()
|
||||
|
||||
clientDrivers.add(driver)
|
||||
|
|
|
@ -234,7 +234,7 @@ class DisconnectReconnectTest : BaseTest() {
|
|||
val log = KotlinLogging.logger("DCUnitTest")
|
||||
// NOTE: once a config is assigned to a driver, the config cannot be changed
|
||||
val aeronDriver = runBlocking {
|
||||
val driver = AeronDriver(serverConfig(), log)
|
||||
val driver = AeronDriver(serverConfig(), log, null)
|
||||
driver.start()
|
||||
driver
|
||||
}
|
||||
|
|
|
@ -133,13 +133,21 @@ class AeronRmiClientServer {
|
|||
}
|
||||
|
||||
else if (config.client) {
|
||||
val server = acs.server()
|
||||
|
||||
server.onMessage<ByteArray> {
|
||||
logger.error { "Received Byte array!" }
|
||||
}
|
||||
|
||||
val client = acs.client(0)
|
||||
|
||||
client.onDisconnect {
|
||||
logger.error { "Disconnect -> Reconnect..." }
|
||||
client.reconnect()
|
||||
}
|
||||
|
||||
client.connect(config.ip, 2000, 2001, 0) // UDP connection via loopback
|
||||
|
||||
|
||||
runBlocking {
|
||||
client.onConnect {
|
||||
logger.error { "Starting test..." }
|
||||
val secureRandom = SecureRandom()
|
||||
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 32
|
||||
|
||||
|
@ -154,7 +162,10 @@ class AeronRmiClientServer {
|
|||
|
||||
// just to start it up.
|
||||
repeat(5) {
|
||||
client.send(hugeData)
|
||||
if (!client.send(hugeData)) {
|
||||
client.logger.error { "Unable to send data!" }
|
||||
return@onConnect
|
||||
}
|
||||
}
|
||||
|
||||
client.logger.error { "Starting test." }
|
||||
|
@ -183,7 +194,14 @@ class AeronRmiClientServer {
|
|||
client.logger.error { "Rate is: ${amountInMB/timedInSeconds} MB/s" }
|
||||
client.logger.error { "Rate is: ${amountInmb/timedInSeconds} mb/s" }
|
||||
}
|
||||
|
||||
client.connect(config.ip, 2000, 2001, 0) // UDP connection via loopback
|
||||
runBlocking {
|
||||
client.waitForClose()
|
||||
client.logger.error { "DONE WAITING" }
|
||||
}
|
||||
}
|
||||
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
println("WHOOPS")
|
||||
|
@ -250,12 +268,8 @@ class AeronRmiClientServer {
|
|||
// }
|
||||
}
|
||||
|
||||
client.onDisconnect {
|
||||
logger.error("disconnect")
|
||||
}
|
||||
|
||||
client.onError { throwable ->
|
||||
logger.error("has error")
|
||||
logger.error("***has error***")
|
||||
throwable.printStackTrace()
|
||||
}
|
||||
|
||||
|
@ -263,19 +277,24 @@ class AeronRmiClientServer {
|
|||
logger.error("HAS MESSAGE! $message")
|
||||
}
|
||||
|
||||
SigInt.register {
|
||||
client.logger.info { "Shutting down via sig-int command" }
|
||||
runBlocking {
|
||||
client.close(closeEverything = true, initiatedByClientClose = false, initiatedByShutdown = false)
|
||||
}
|
||||
}
|
||||
// SigInt.register {
|
||||
// client.logger.info { "Shutting down via sig-int command" }
|
||||
// runBlocking {
|
||||
// client.close(closeEverything = true, initiatedByClientClose = false, initiatedByShutdown = false)
|
||||
// }
|
||||
// }
|
||||
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
fun config(config: Configuration) {
|
||||
config.settingsStore = Storage.Memory() // don't want to persist anything on disk!
|
||||
if (config is ServerConfiguration) {
|
||||
config.settingsStore = Storage.Property().file("config.json")
|
||||
} else {
|
||||
// don't want to persist anything on disk!
|
||||
config.settingsStore = Storage.Memory()
|
||||
}
|
||||
|
||||
config.appId = "aeron_test"
|
||||
config.enableIPv6 = false
|
||||
|
@ -284,7 +303,7 @@ class AeronRmiClientServer {
|
|||
// config.enableIpc = false
|
||||
// config.uniqueAeronDirectory = true
|
||||
|
||||
config.forceAllowSharedAeronDriver = true
|
||||
// config.forceAllowSharedAeronDriver = true
|
||||
|
||||
// dedicate more **OOMPF** to the network
|
||||
config.threadingMode = ThreadingMode.SHARED_NETWORK
|
||||
|
@ -367,7 +386,9 @@ class AeronRmiClientServer {
|
|||
SigInt.register {
|
||||
server.logger.info { "Shutting down via sig-int command" }
|
||||
runBlocking {
|
||||
server.close(closeEverything = true, initiatedByClientClose = false, initiatedByShutdown = false)
|
||||
server.close(
|
||||
closeEverything = true, releaseWaitingThreads = true
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue