Added more unit tests for aeron connectivity

This commit is contained in:
Robinson 2023-12-04 11:10:52 +01:00
parent cdc056f3a1
commit 6bbf62f886
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 239 additions and 0 deletions

View File

@ -16,11 +16,15 @@
package dorkboxTest.network package dorkboxTest.network
import dorkbox.collections.LockFreeArrayList
import dorkbox.network.Configuration
import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.endpoint import dorkbox.network.aeron.endpoint
import dorkbox.network.exceptions.ClientTimedOutException import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.util.NamedThreadFactory
import io.aeron.CommonContext import io.aeron.CommonContext
import io.aeron.Publication import io.aeron.Publication
import kotlinx.atomicfu.atomic
import org.junit.Assert import org.junit.Assert
import org.junit.Test import org.junit.Test
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -110,6 +114,241 @@ class AeronPubSubTest : BaseTest() {
Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed()) Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed())
} }
@Test
fun reconnectTest() {
val log = LoggerFactory.getLogger("ConnectTest")
// NOTE: once a config is assigned to a driver, the config cannot be changed
val totalCount = 40
val port = 3535
val serverStreamId = 55555
val handshakeTimeoutNs = TimeUnit.SECONDS.toNanos(10)
val serverDriver = run {
val conf = serverConfig()
conf.enableIPv6 = false
conf.uniqueAeronDirectory = true
val driver = AeronDriver(conf, log, null)
driver.start()
driver
}
val clientDrivers = mutableListOf<AeronDriver>()
val clientPublications = mutableListOf<Pair<AeronDriver, Publication>>()
for (i in 1..totalCount) {
val conf = clientConfig()
conf.enableIPv6 = false
conf.uniqueAeronDirectory = true
val driver = AeronDriver(conf, log, null)
driver.start()
clientDrivers.add(driver)
}
val subscriptionUri = AeronDriver.uriHandshake(CommonContext.UDP_MEDIA, true)
.endpoint(true, "127.0.0.1", port)
val sub = serverDriver.addSubscription(subscriptionUri, serverStreamId, "server", false)
var sessionID = 1234567
clientDrivers.forEachIndexed { index, clientDriver ->
val publicationUri = AeronDriver.uri(CommonContext.UDP_MEDIA, sessionID++, true)
.endpoint(true, "127.0.0.1", port)
// can throw an exception! We catch it in the calling class
val publication = clientDriver.addPublication(publicationUri, serverStreamId, "client_$index", false)
// can throw an exception! We catch it in the calling class
// we actually have to wait for it to connect before we continue
clientDriver.waitForConnection(publication, handshakeTimeoutNs, "client_$index") { cause ->
ClientTimedOutException("Client publication cannot connect with localhost server", cause)
}
clientPublications.add(Pair(clientDriver, publication))
}
clientPublications.forEachIndexed { index, (clientDriver, pub) ->
clientDriver.close(pub, "client_$index")
}
println("reconnecting..")
// THE RECONNECT PART
clientDrivers.forEachIndexed { index, clientDriver ->
val publicationUri = AeronDriver.uri(CommonContext.UDP_MEDIA, sessionID++, true)
.endpoint(true, "127.0.0.1", port)
// can throw an exception! We catch it in the calling class
val publication = clientDriver.addPublication(publicationUri, serverStreamId, "client_$index", false)
// can throw an exception! We catch it in the calling class
// we actually have to wait for it to connect before we continue
clientDriver.waitForConnection(publication, handshakeTimeoutNs, "client_$index") { cause ->
ClientTimedOutException("Client publication cannot connect with localhost server", cause)
}
clientPublications.add(Pair(clientDriver, publication))
}
clientPublications.forEachIndexed { index, (clientDriver, pub) ->
clientDriver.close(pub, "client_$index")
}
println("Closing..")
clientDrivers.forEach { clientDriver ->
clientDriver.close()
}
serverDriver.close(sub, "server")
clientDrivers.forEach { clientDriver ->
clientDriver.ensureStopped(10_000, 500)
}
serverDriver.close()
serverDriver.ensureStopped(10_000, 500)
// have to make sure that the aeron driver is CLOSED.
Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed())
}
@Test
fun reconnectMultiTest() {
val log = LoggerFactory.getLogger("ConnectTest")
// NOTE: once a config is assigned to a driver, the config cannot be changed
val totalCount = 80
val port = 3535
val serverStreamId = 55555
val handshakeTimeoutNs = TimeUnit.SECONDS.toNanos(10)
val serverDriver = run {
val conf = serverConfig()
conf.enableIPv6 = false
conf.uniqueAeronDirectory = true
val driver = AeronDriver(conf, log, null)
driver.start()
driver
}
val clientDrivers = mutableListOf<AeronDriver>()
val clientPublications = LockFreeArrayList<Pair<AeronDriver, Publication>>()
for (i in 1..totalCount) {
val conf = clientConfig()
conf.enableIPv6 = false
conf.uniqueAeronDirectory = true
val driver = AeronDriver(conf, log, null)
driver.start()
clientDrivers.add(driver)
}
val subscriptionUri = AeronDriver.uriHandshake(CommonContext.UDP_MEDIA, true)
.endpoint(true, "127.0.0.1", port)
val sub = serverDriver.addSubscription(subscriptionUri, serverStreamId, "server", false)
val sessionID = atomic(1234567)
// if we are on the same JVM, the defaultScope for coroutines is SHARED, and limited!
val differentThreadLaunchers = Executors.newFixedThreadPool(totalCount/2,
NamedThreadFactory("Unit Test Client", Configuration.networkThreadGroup, true)
)
var latch = CountDownLatch(clientDrivers.size)
clientDrivers.forEachIndexed { index, clientDriver ->
differentThreadLaunchers.submit {
val publicationUri = AeronDriver.uri(CommonContext.UDP_MEDIA, sessionID.getAndIncrement(), true).endpoint(true, "127.0.0.1", port)
// can throw an exception! We catch it in the calling class
val publication = clientDriver.addPublication(publicationUri, serverStreamId, "client_$index", false)
// can throw an exception! We catch it in the calling class
// we actually have to wait for it to connect before we continue
clientDriver.waitForConnection(publication, handshakeTimeoutNs, "client_$index") { cause ->
ClientTimedOutException("Client publication cannot connect with localhost server", cause)
}
clientPublications.add(Pair(clientDriver, publication))
latch.countDown()
}
}
latch.await()
clientPublications.forEachIndexed { index, (clientDriver, pub) ->
clientDriver.close(pub, "client_$index")
}
println("reconnecting..")
latch = CountDownLatch(clientDrivers.size)
// THE RECONNECT PART
clientDrivers.forEachIndexed { index, clientDriver ->
differentThreadLaunchers.submit {
val publicationUri = AeronDriver.uri(CommonContext.UDP_MEDIA, sessionID.getAndIncrement(), true).endpoint(true, "127.0.0.1", port)
// can throw an exception! We catch it in the calling class
val publication = clientDriver.addPublication(publicationUri, serverStreamId, "client_$index", false)
// can throw an exception! We catch it in the calling class
// we actually have to wait for it to connect before we continue
clientDriver.waitForConnection(publication, handshakeTimeoutNs, "client_$index") { cause ->
ClientTimedOutException("Client publication cannot connect with localhost server", cause)
}
clientPublications.add(Pair(clientDriver, publication))
latch.countDown()
}
}
latch.await()
clientPublications.forEachIndexed { index, (clientDriver, pub) ->
clientDriver.close(pub, "client_$index")
}
println("Closing..")
clientDrivers.forEach { clientDriver ->
clientDriver.close()
}
serverDriver.close(sub, "server")
clientDrivers.forEach { clientDriver ->
clientDriver.ensureStopped(10_000, 500)
}
serverDriver.close()
serverDriver.ensureStopped(10_000, 500)
// have to make sure that the aeron driver is CLOSED.
Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed())
}
@Test() @Test()
fun connectFailWithBadSessionIdTest() { fun connectFailWithBadSessionIdTest() {
val log = LoggerFactory.getLogger("ConnectTest") val log = LoggerFactory.getLogger("ConnectTest")