Updated unit tests APIs

This commit is contained in:
Robinson 2023-06-16 14:15:27 +02:00
parent 173ad3a691
commit bc0a0e2dcc
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
7 changed files with 228 additions and 81 deletions

View File

@ -22,6 +22,7 @@ import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.Publication import io.aeron.Publication
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import mu.KotlinLogging import mu.KotlinLogging
import org.junit.Assert
import org.junit.Test import org.junit.Test
class AeronPubSubTest : BaseTest() { class AeronPubSubTest : BaseTest() {
@ -99,7 +100,7 @@ class AeronPubSubTest : BaseTest() {
} }
} }
@Test(expected = ClientTimedOutException::class) @Test()
fun connectFailWithBadSessionIdTest() { fun connectFailWithBadSessionIdTest() {
runBlocking { runBlocking {
val log = KotlinLogging.logger("ConnectTest") val log = KotlinLogging.logger("ConnectTest")
@ -142,14 +143,18 @@ class AeronPubSubTest : BaseTest() {
val subscriptionUri = AeronDriver.uriHandshake("udp", true).endpoint(true, "127.0.0.1", port) val subscriptionUri = AeronDriver.uriHandshake("udp", true).endpoint(true, "127.0.0.1", port)
val sub = serverDriver.addSubscription(subscriptionUri, serverStreamId, "server") val sub = serverDriver.addSubscription(subscriptionUri, serverStreamId, "server")
var sessionID = 1234567 try {
clientDrivers.forEachIndexed { index, clientDriver -> var sessionID = 1234567
val publicationUri = AeronDriver.uri("udp", sessionID, true).endpoint(true, "127.0.0.1", port) clientDrivers.forEachIndexed { index, clientDriver ->
clientDriver.addPublicationWithTimeout(publicationUri, handshakeTimeoutSec, serverStreamId, "client_$index") { cause -> val publicationUri = AeronDriver.uri("udp", sessionID, true).endpoint(true, "127.0.0.1", port)
ClientTimedOutException("Client publication cannot connect with localhost server", cause) clientDriver.addPublicationWithTimeout(publicationUri, handshakeTimeoutSec, serverStreamId, "client_$index") { cause ->
}.also { ClientTimedOutException("Client publication cannot connect with localhost server", cause)
clientPublications.add(Pair(clientDriver, it)) }.also {
clientPublications.add(Pair(clientDriver, it))
}
} }
Assert.fail("TimeoutException should be caught!")
} catch (ignore: Exception) {
} }

View File

@ -195,17 +195,27 @@ abstract class BaseTest {
/** /**
* Immediately stop the endpoints * Immediately stop the endpoints
*
* Can stop from inside different callbacks
* - message
* - connect
* - disconnect
*/ */
fun stopEndPoints(stopAfterMillis: Long = 0L) { fun stopEndPointsBlocking(stopAfterMillis: Long = 0L) {
runBlocking { runBlocking {
stopEndPointsSuspending(stopAfterMillis) stopEndPoints(stopAfterMillis)
} }
} }
/** /**
* Immediately stop the endpoints * Immediately stop the endpoints
*
* Can stop from inside different callbacks
* - message
* - connect
* - disconnect
*/ */
suspend fun stopEndPointsSuspending(stopAfterMillis: Long = 0L) { suspend fun stopEndPoints(stopAfterMillis: Long = 0L) {
if (isStopping) { if (isStopping) {
return return
} }
@ -225,7 +235,8 @@ abstract class BaseTest {
// shutdown clients first // shutdown clients first
clients.forEach { endPoint -> clients.forEach { endPoint ->
// we are ASYNC, so we must use callbacks to execute code // we are ASYNC, so we must use callbacks to execute code
endPoint.close { endPoint.close(true) {
logger.error("Closed client connection")
latch.countDown() latch.countDown()
} }
} }
@ -236,7 +247,8 @@ abstract class BaseTest {
// shutdown everything else (should only be servers) last // shutdown everything else (should only be servers) last
servers.forEach { servers.forEach {
it.close { it.close(true) {
logger.error("Closed server connection")
latch.countDown() latch.countDown()
} }
} }
@ -259,6 +271,9 @@ abstract class BaseTest {
false false
} }
// have to make sure that the aeron driver is CLOSED.
Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed())
logger.error("Shut down all endpoints... Success($error)") logger.error("Shut down all endpoints... Success($error)")
} }
/** /**
@ -269,18 +284,18 @@ abstract class BaseTest {
* @param stopAfterSeconds how many seconds to wait, the default is 2 minutes. * @param stopAfterSeconds how many seconds to wait, the default is 2 minutes.
*/ */
fun waitForThreads(stopAfterSeconds: Long = AUTO_FAIL_TIMEOUT, preShutdownAction: () -> Unit = {}) { fun waitForThreads(stopAfterSeconds: Long = AUTO_FAIL_TIMEOUT, preShutdownAction: () -> Unit = {}) {
var latchTriggered = try { var latchTriggered = runBlocking {
runBlocking { try {
if (stopAfterSeconds == 0L || EndPoint.DEBUG_CONNECTIONS) { if (stopAfterSeconds == 0L || EndPoint.DEBUG_CONNECTIONS) {
latch.await(Long.MAX_VALUE, TimeUnit.SECONDS) latch.await(Long.MAX_VALUE, TimeUnit.SECONDS)
} else { } else {
latch.await(stopAfterSeconds, TimeUnit.SECONDS) latch.await(stopAfterSeconds, TimeUnit.SECONDS)
} }
} catch (e: Exception) {
e.printStackTrace()
stopEndPoints()
false
} }
} catch (e: Exception) {
e.printStackTrace()
stopEndPoints()
false
} }
// run actions before we actually shutdown, but after we wait // run actions before we actually shutdown, but after we wait
@ -295,7 +310,7 @@ abstract class BaseTest {
// always stop the endpoints (even if we already called this) // always stop the endpoints (even if we already called this)
try { try {
stopEndPoints() stopEndPointsBlocking()
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
} }
@ -334,9 +349,7 @@ abstract class BaseTest {
// if the thread is interrupted, then it means we finished the test. // if the thread is interrupted, then it means we finished the test.
LoggerFactory.getLogger(this.javaClass.simpleName).error("Test did not complete in a timely manner...") LoggerFactory.getLogger(this.javaClass.simpleName).error("Test did not complete in a timely manner...")
runBlocking { stopEndPointsBlocking()
stopEndPoints()
}
Assert.fail("Test did not complete in a timely manner.") Assert.fail("Test did not complete in a timely manner.")
} catch (ignored: InterruptedException) { } catch (ignored: InterruptedException) {
} }

View File

@ -57,18 +57,17 @@ class ConnectionFilterTest : BaseTest() {
} }
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
waitForThreads() waitForThreads()
Assert.assertTrue(serverConnectSuccess.value) Assert.assertTrue(serverConnectSuccess.value)
@ -85,9 +84,9 @@ class ConnectionFilterTest : BaseTest() {
val server: Server<Connection> = Server(configuration) val server: Server<Connection> = Server(configuration)
addEndPoint(server) addEndPoint(server)
server.bind()
server.filter(IpSubnetFilterRule(IPv4.WILDCARD, 0)) server.filter(IpSubnetFilterRule(IPv4.WILDCARD, 0))
server.filter(IpSubnetFilterRule(IPv6.WILDCARD, 0)) server.filter(IpSubnetFilterRule(IPv6.WILDCARD, 0))
server.bind()
server.onConnect { server.onConnect {
serverConnectSuccess.value = true serverConnectSuccess.value = true
@ -106,13 +105,13 @@ class ConnectionFilterTest : BaseTest() {
} }
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
@ -154,13 +153,13 @@ class ConnectionFilterTest : BaseTest() {
} }
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
@ -203,13 +202,13 @@ class ConnectionFilterTest : BaseTest() {
} }
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
@ -245,13 +244,13 @@ class ConnectionFilterTest : BaseTest() {
addEndPoint(client) addEndPoint(client)
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
@ -297,14 +296,15 @@ class ConnectionFilterTest : BaseTest() {
} }
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() logger.error { "STARTING TO CLOSE CLIENT" }
stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
stopEndPoints() stopEndPointsBlocking()
// this is expected. // this is expected.
} }
} }
@ -338,13 +338,13 @@ class ConnectionFilterTest : BaseTest() {
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
@ -385,18 +385,17 @@ class ConnectionFilterTest : BaseTest() {
} }
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
waitForThreads() waitForThreads()
Assert.assertTrue(serverConnectSuccess.value) Assert.assertTrue(serverConnectSuccess.value)
@ -432,13 +431,13 @@ class ConnectionFilterTest : BaseTest() {
} }
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
@ -475,13 +474,13 @@ class ConnectionFilterTest : BaseTest() {
addEndPoint(client) addEndPoint(client)
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }
@ -514,13 +513,13 @@ class ConnectionFilterTest : BaseTest() {
client.onDisconnect { client.onDisconnect {
stopEndPointsSuspending() stopEndPoints()
} }
try { try {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} catch (e: Exception) { } catch (e: Exception) {
stopEndPoints() stopEndPointsBlocking()
throw e throw e
} }
} }

View File

@ -68,7 +68,7 @@ class ErrorLoggerTest : BaseTest() {
client.onConnect { client.onConnect {
// can be any message, we just want the error-log to log something // can be any message, we just want the error-log to log something
send(TestObj()) send(TestObj())
stopEndPointsSuspending() stopEndPoints()
} }
client.connect(LOCALHOST) client.connect(LOCALHOST)

View File

@ -60,7 +60,7 @@ class MultiClientTest : BaseTest() {
// if we DO NOT wait, what will happen is that the client will CLOSE before it receives the handshake HELLO_ACK // if we DO NOT wait, what will happen is that the client will CLOSE before it receives the handshake HELLO_ACK
// delay(500) // delay(500)
stopEndPointsSuspending() stopEndPoints()
} }
} }

View File

@ -48,8 +48,6 @@ class PingTest : BaseTest() {
ping { ping {
// a ping object is returned, once the round-trip is complete // a ping object is returned, once the round-trip is complete
val count = counter.getAndIncrement() val count = counter.getAndIncrement()
println(count)
if (count == 99) { if (count == 99) {
clientSuccess.value = true clientSuccess.value = true
@ -66,7 +64,7 @@ class PingTest : BaseTest() {
client.connect(LOCALHOST) client.connect(LOCALHOST)
} }
waitForThreads(1500) waitForThreads()
Assert.assertTrue(clientSuccess.value) Assert.assertTrue(clientSuccess.value)
} }

View File

@ -46,59 +46,120 @@ class SimpleTest : BaseTest() {
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIp4() { fun simpleIp4Server() {
simple(ConnectType.IP4) simpleServerShutdown(ConnectType.IP4)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIp6() { fun simpleIp6Server() {
simple(ConnectType.IP6) simpleServerShutdown(ConnectType.IP6)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIp46() { fun simpleIp46Server() {
simple(ConnectType.IP46) simpleServerShutdown(ConnectType.IP46)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIp64() { fun simpleIp64Server() {
simple(ConnectType.IP64) simpleServerShutdown(ConnectType.IP64)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIpc() { fun simpleIpcServer() {
simple(ConnectType.IPC) simpleServerShutdown(ConnectType.IPC)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIpc4Fallback() { fun simpleIpc4FallbackServer() {
simple(ConnectType.IPC4, ConnectType.IPC) simpleServerShutdown(ConnectType.IPC4, ConnectType.IPC)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIpc6Fallback() { fun simpleIpc6FallbackServer() {
simple(ConnectType.IPC6 , ConnectType.IPC) simpleServerShutdown(ConnectType.IPC6, ConnectType.IPC)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIpc46Fallback() { fun simpleIpc46FallbackServer() {
simple(ConnectType.IPC46 , ConnectType.IPC) simpleServerShutdown(ConnectType.IPC46, ConnectType.IPC)
} }
@Test @Test
@Throws(SecurityException::class, IOException::class) @Throws(SecurityException::class, IOException::class)
fun simpleIpc64Fallback() { fun simpleIpc64FallbackServer() {
simple(ConnectType.IPC64 , ConnectType.IPC) simpleServerShutdown(ConnectType.IPC64, ConnectType.IPC)
} }
private fun simple(clientType: ConnectType, serverType: ConnectType = clientType) {
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIp4Client() {
simpleClientShutdown(ConnectType.IP4)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIp6Client() {
simpleClientShutdown(ConnectType.IP6)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIp46Client() {
simpleClientShutdown(ConnectType.IP46)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIp64Client() {
simpleClientShutdown(ConnectType.IP64)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIpcClient() {
simpleClientShutdown(ConnectType.IPC)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIpc4FallbackClient() {
simpleClientShutdown(ConnectType.IPC4, ConnectType.IPC)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIpc6FallbackClient() {
simpleClientShutdown(ConnectType.IPC6, ConnectType.IPC)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIpc46FallbackClient() {
simpleClientShutdown(ConnectType.IPC46, ConnectType.IPC)
}
@Test
@Throws(SecurityException::class, IOException::class)
fun simpleIpc64FallbackClient() {
simpleClientShutdown(ConnectType.IPC64, ConnectType.IPC)
}
// shutdown from the server
private fun simpleServerShutdown(clientType: ConnectType, serverType: ConnectType = clientType) {
received.set(false) received.set(false)
sent.set(false) sent.set(false)
@ -120,6 +181,8 @@ class SimpleTest : BaseTest() {
received.set(true) received.set(true)
logger.error("Done, stopping endpoints") logger.error("Done, stopping endpoints")
// this must NOT be on the disconenct thread, because we cancel it!
stopEndPoints() stopEndPoints()
} }
@ -144,15 +207,84 @@ class SimpleTest : BaseTest() {
} }
when (clientType) { when (clientType) {
ConnectType.IPC -> client.connect() ConnectType.IPC -> { client.connect() }
ConnectType.IPC4 -> client.connect(IPv4.LOCALHOST) ConnectType.IPC4 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IPC6 -> client.connect(IPv6.LOCALHOST) ConnectType.IPC6 -> { client.connect(IPv6.LOCALHOST) }
ConnectType.IPC46 -> client.connect(IPv4.LOCALHOST) ConnectType.IPC46 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IPC64 -> client.connect(IPv6.LOCALHOST) ConnectType.IPC64 -> { client.connect(IPv6.LOCALHOST) }
ConnectType.IP4 -> client.connect(IPv4.LOCALHOST) ConnectType.IP4 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IP6 -> client.connect(IPv6.LOCALHOST) ConnectType.IP6 -> { client.connect(IPv6.LOCALHOST) }
ConnectType.IP46 -> client.connect(IPv4.LOCALHOST) ConnectType.IP46 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IP64 -> client.connect(IPv6.LOCALHOST) ConnectType.IP64 -> { client.connect(IPv6.LOCALHOST) }
}
}
waitForThreads()
assertTrue(sent.get())
assertTrue(received.get())
}
// shutdown from the client
private fun simpleClientShutdown(clientType: ConnectType, serverType: ConnectType = clientType) {
received.set(false)
sent.set(false)
run {
val configuration = serverConfig()
configuration.port = 12312
configuration.enableIPv4 = serverType.ip4
configuration.enableIPv6 = serverType.ip6
configuration.enableIpc = serverType.ipc
val server: Server<Connection> = Server(configuration)
addEndPoint(server)
server.onMessage<String> { message ->
if (message != "client") {
Assert.fail()
}
received.set(true)
logger.error("Done, stopping endpoints")
close()
}
server.bind()
}
run {
val configuration = clientConfig()
configuration.port = 12312
configuration.enableIPv4 = clientType.ip4
configuration.enableIPv6 = clientType.ip6
configuration.enableIpc = clientType.ipc
val client: Client<Connection> = Client(configuration)
addEndPoint(client)
client.onConnect {
sent.set(true)
send("client")
}
client.onDisconnect {
stopEndPoints()
}
when (clientType) {
ConnectType.IPC -> { client.connect() }
ConnectType.IPC4 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IPC6 -> { client.connect(IPv6.LOCALHOST) }
ConnectType.IPC46 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IPC64 -> { client.connect(IPv6.LOCALHOST) }
ConnectType.IP4 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IP6 -> { client.connect(IPv6.LOCALHOST) }
ConnectType.IP46 -> { client.connect(IPv4.LOCALHOST) }
ConnectType.IP64 -> { client.connect(IPv6.LOCALHOST) }
} }
} }