diff --git a/test/dorkboxTest/network/AeronPubSubTest.kt b/test/dorkboxTest/network/AeronPubSubTest.kt index 80c59457..7bf3d105 100644 --- a/test/dorkboxTest/network/AeronPubSubTest.kt +++ b/test/dorkboxTest/network/AeronPubSubTest.kt @@ -32,7 +32,7 @@ import java.util.concurrent.* class AeronPubSubTest : BaseTest() { @Test - fun connectTest() { + fun connectTestNormalPub() { val log = LoggerFactory.getLogger("ConnectTest") // NOTE: once a config is assigned to a driver, the config cannot be changed @@ -114,6 +114,89 @@ class AeronPubSubTest : BaseTest() { Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed()) } + @Test + fun connectTestExclusivePub() { + val log = LoggerFactory.getLogger("ConnectTest") + + // NOTE: once a config is assigned to a driver, the config cannot be changed + val totalCount = 40 + val port = 3535 + val serverStreamId = 55555 + val handshakeTimeoutNs = TimeUnit.SECONDS.toNanos(10) + + + + val serverDriver = run { + val conf = serverConfig() + conf.enableIPv6 = false + conf.uniqueAeronDirectory = true + + val driver = AeronDriver(conf, log, null) + driver.start() + driver + } + + val clientDrivers = mutableListOf() + val clientPublications = mutableListOf>() + + + for (i in 1..totalCount) { + val conf = clientConfig() + conf.enableIPv6 = false + conf.uniqueAeronDirectory = true + + val driver = AeronDriver(conf, log, null) + driver.start() + + clientDrivers.add(driver) + } + + + + val subscriptionUri = AeronDriver.uriHandshake(CommonContext.UDP_MEDIA, true) + .endpoint(true, "127.0.0.1", port) + val sub = serverDriver.addSubscription(subscriptionUri, serverStreamId, "server", false) + + var sessionID = 1234567 + clientDrivers.forEachIndexed { index, clientDriver -> + val publicationUri = AeronDriver.uri(CommonContext.UDP_MEDIA, sessionID++, true) + .endpoint(true, "127.0.0.1", port) + + // can throw an exception! We catch it in the calling class + val publication = clientDriver.addExclusivePublication(publicationUri, serverStreamId, "client_$index", false) + + // can throw an exception! We catch it in the calling class + // we actually have to wait for it to connect before we continue + clientDriver.waitForConnection(publication, handshakeTimeoutNs, "client_$index") { cause -> + ClientTimedOutException("Client publication cannot connect with localhost server", cause) + } + + clientPublications.add(Pair(clientDriver, publication)) + } + + + clientPublications.forEachIndexed { index, (clientDriver, pub) -> + clientDriver.close(pub, "client_$index") + } + + serverDriver.close(sub, "server") + + + clientDrivers.forEach { clientDriver -> + clientDriver.close() + } + + clientDrivers.forEach { clientDriver -> + clientDriver.ensureStopped(10_000, 500) + } + + serverDriver.close() + serverDriver.ensureStopped(10_000, 500) + + // have to make sure that the aeron driver is CLOSED. + Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed()) + } + @Test fun reconnectTest() { val log = LoggerFactory.getLogger("ConnectTest") diff --git a/test/dorkboxTest/network/MultiClientTest.kt b/test/dorkboxTest/network/MultiClientTest.kt index 4da9244c..b2511f76 100644 --- a/test/dorkboxTest/network/MultiClientTest.kt +++ b/test/dorkboxTest/network/MultiClientTest.kt @@ -34,7 +34,7 @@ import java.util.concurrent.* @Suppress("UNUSED_ANONYMOUS_PARAMETER") class MultiClientTest : BaseTest() { // this can be upped to 100 for stress testing, but for general unit tests this should be smaller (as this is sensitive on the load of the machine) - private val totalCount = 10 + private val totalCount = 80 private val clientConnectCount = atomic(0) private val serverConnectCount = atomic(0)