updated unit tests

This commit is contained in:
Robinson 2023-06-25 17:25:29 +02:00
parent d63b8a6514
commit a81e5316c7
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
17 changed files with 227 additions and 210 deletions

View File

@ -97,6 +97,9 @@ class AeronPubSubTest : BaseTest() {
serverDriver.close()
serverDriver.ensureStopped(10_000, 500)
// have to make sure that the aeron driver is CLOSED.
Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed())
}
}
@ -175,6 +178,9 @@ class AeronPubSubTest : BaseTest() {
serverDriver.close()
serverDriver.ensureStopped(10_000, 500)
// have to make sure that the aeron driver is CLOSED.
Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed())
}
}
}

View File

@ -46,7 +46,7 @@ import java.util.concurrent.atomic.*
@Suppress("UNUSED_ANONYMOUS_PARAMETER")
class AeronRmiClientServer {
companion object {
val counter = AtomicInteger(0)
private val counter = AtomicInteger(0)
init {
try {

View File

@ -29,13 +29,16 @@ import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.EventDispatcher
import dorkbox.os.OS
import dorkbox.storage.Storage
import dorkbox.util.entropy.Entropy
import dorkbox.util.entropy.SimpleEntropy
import dorkbox.util.exceptions.InitializationException
import dorkbox.util.sync.CountingLatch
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.After
import org.junit.Assert
@ -45,6 +48,7 @@ import java.lang.reflect.Field
import java.lang.reflect.Method
import java.util.concurrent.*
@OptIn(DelicateCoroutinesApi::class)
abstract class BaseTest {
companion object {
const val LOCALHOST = "localhost"
@ -98,6 +102,7 @@ abstract class BaseTest {
configuration.port = 2000
configuration.enableIpc = false
configuration.enableIPv6 = false
block(configuration)
return configuration
@ -109,6 +114,7 @@ abstract class BaseTest {
configuration.port = 2000
configuration.enableIpc = false
configuration.enableIPv6 = false
configuration.maxClientCount = 50
configuration.maxConnectionsPerIpAddress = 50
@ -158,8 +164,6 @@ abstract class BaseTest {
}
}
private val latch = CountingLatch()
@Volatile
private var autoFailThread: Thread? = null
@ -175,22 +179,18 @@ abstract class BaseTest {
logger.error("---- " + this.javaClass.simpleName)
// we must always make sure that aeron is shut-down before starting again.
if (!Server.ensureStopped(serverConfig()) || !Client.ensureStopped(clientConfig())) {
throw IllegalStateException("Unable to continue, AERON was unable to stop.")
}
AeronDriver.checkForMemoryLeaks()
}
fun addEndPoint(endPoint: EndPoint<*>) {
endPoint.onInit { logger.error { "init" } }
endPoint.onConnect { logger.error { "connect" } }
endPoint.onDisconnect { logger.error { "disconnect" } }
endPoint.onInit { logger.error { "UNIT TEST: init" } }
endPoint.onConnect { logger.error { "UNIT TEST: connect" } }
endPoint.onDisconnect { logger.error { "UNIT TEST: disconnect" } }
endPoint.onError { logger.error(it) { "ERROR!" } }
endPoint.onError { logger.error(it) { "UNIT TEST: ERROR!" } }
endPointConnections.add(endPoint)
latch.countUp()
}
/**
@ -219,6 +219,18 @@ abstract class BaseTest {
if (isStopping) {
return
}
if (EventDispatcher.isCurrentEvent()) {
// we want to redispatch, in the event we are already running inside the event dispatch
// this gives us the chance to properly exit/close WITHOUT blocking currentEventDispatch
// during the `waitForClose()` call
GlobalScope.launch {
stopEndPoints(stopAfterMillis)
}
return
}
isStopping = true
if (stopAfterMillis > 0L) {
@ -231,50 +243,44 @@ abstract class BaseTest {
logger.error("Unit test shutting down ${clients.size} clients...")
logger.error("Unit test shutting down ${servers.size} server...")
val timeoutMS = 0L
// val timeoutMS = if (EndPoint.DEBUG_CONNECTIONS) {
// Long.MAX_VALUE
// } else {
// TimeUnit.SECONDS.toMillis(AUTO_FAIL_TIMEOUT)
// }
var success = true
// shutdown clients first
clients.forEach { endPoint ->
// we are ASYNC, so we must use callbacks to execute code
endPoint.close(true) {
logger.error("Closed client connection")
latch.countDown()
}
endPoint.close()
}
clients.forEach { endPoint ->
endPoint.waitForClose()
success = success && endPoint.waitForClose(timeoutMS)
endPoint.stopDriver()
}
// shutdown everything else (should only be servers) last
servers.forEach {
it.close(true) {
logger.error("Closed server connection")
latch.countDown()
}
it.close()
}
servers.forEach { endPoint ->
endPoint.waitForClose()
success = success && endPoint.waitForClose(timeoutMS)
endPoint.stopDriver()
}
endPointConnections.clear()
// now we have to wait for the close events to finish running.
val error = try {
if (EndPoint.DEBUG_CONNECTIONS) {
latch.await(Long.MAX_VALUE, TimeUnit.SECONDS)
} else {
latch.await(AUTO_FAIL_TIMEOUT, TimeUnit.SECONDS)
}
} catch (e: Exception) {
e.printStackTrace()
false
}
logger.error("UNIT TEST, checking driver and memory leaks")
// have to make sure that the aeron driver is CLOSED.
Assert.assertTrue("The aeron drivers are not fully closed!", AeronDriver.areAllInstancesClosed())
AeronDriver.checkForMemoryLeaks()
logger.error("Shut down all endpoints... Success($error)")
logger.error("Shut down all endpoints... Success($success)")
}
/**
* Wait for network client/server threads to shut down for the specified time. 0 will wait forever
@ -283,60 +289,45 @@ abstract class BaseTest {
*
* @param stopAfterSeconds how many seconds to wait, the default is 2 minutes.
*/
fun waitForThreads(stopAfterSeconds: Long = AUTO_FAIL_TIMEOUT, preShutdownAction: () -> Unit = {}) {
var latchTriggered = runBlocking {
try {
if (stopAfterSeconds == 0L || EndPoint.DEBUG_CONNECTIONS) {
latch.await(Long.MAX_VALUE, TimeUnit.SECONDS)
} else {
latch.await(stopAfterSeconds, TimeUnit.SECONDS)
}
} catch (e: Exception) {
e.printStackTrace()
stopEndPoints()
false
}
fun waitForThreads(stopAfterSeconds: Long = AUTO_FAIL_TIMEOUT) = runBlocking {
val clients = endPointConnections.filterIsInstance<Client<Connection>>()
val servers = endPointConnections.filterIsInstance<Server<Connection>>()
val timeoutMS = 0L
// val timeoutMS = if (stopAfterSeconds == 0L || EndPoint.DEBUG_CONNECTIONS) {
// Long.MAX_VALUE
// } else {
// TimeUnit.SECONDS.toMillis(stopAfterSeconds)
// }
var success = true
clients.forEach { endPoint ->
success = success && endPoint.waitForClose(timeoutMS)
endPoint.stopDriver()
}
servers.forEach { endPoint ->
success = success && endPoint.waitForClose(timeoutMS)
endPoint.stopDriver()
}
// run actions before we actually shutdown, but after we wait
if (latchTriggered) {
preShutdownAction()
} else {
println("LATCH NOT TRIGGERED")
println("LATCH NOT TRIGGERED")
println("LATCH NOT TRIGGERED")
println("LATCH NOT TRIGGERED")
if (!success) {
Assert.fail("Shutdown latch not triggered!")
}
// always stop the endpoints (even if we already called this)
try {
stopEndPointsBlocking()
} catch (e: Exception) {
e.printStackTrace()
if (!AeronDriver.areAllInstancesClosed(logger)) {
throw RuntimeException("Unable to shutdown! There are still Aeron drivers loaded!")
}
// still. we must WAIT for it to finish!
latchTriggered = try {
runBlocking {
if (stopAfterSeconds == 0L || EndPoint.DEBUG_CONNECTIONS) {
latch.await(Long.MAX_VALUE, TimeUnit.SECONDS)
} else {
latch.await(stopAfterSeconds, TimeUnit.SECONDS)
}
}
} catch (e: InterruptedException) {
e.printStackTrace()
false
// we must always make sure that aeron is shut-down before starting again.
if (!Server.ensureStopped(serverConfig()) || !Client.ensureStopped(clientConfig())) {
throw IllegalStateException("Unable to continue, AERON was unable to stop.")
}
logger.error("Finished shutting ($latchTriggered) down all endpoints...")
AeronDriver.checkForMemoryLeaks()
runBlocking {
if (!AeronDriver.areAllInstancesClosed(logger)) {
throw RuntimeException("Unable to shutdown! There are still Aeron drivers loaded!")
}
}
logger.error("Finished shutting down all endpoints... ($success)")
}
@Before

View File

@ -27,13 +27,16 @@ import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.junit.Assert
import org.junit.Test
import java.io.IOException
import java.util.concurrent.*
class DisconnectReconnectTest : BaseTest() {
private val reconnectCount = atomic(0)
private val reconnects = 2
@Test
fun reconnectClient() {
val latch = CountDownLatch(reconnects+1)
val reconnectCount = atomic(0)
run {
val configuration = serverConfig()
@ -56,37 +59,33 @@ class DisconnectReconnectTest : BaseTest() {
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onDisconnect {
latch.countDown()
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
if (count < reconnects) {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
client.connect(LOCALHOST)
}
}
client.connect(LOCALHOST)
}
latch.await()
stopEndPointsBlocking()
waitForThreads()
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
Assert.assertEquals(reconnects+1, reconnectCount.value)
}
@Test
fun reconnectClientViaClientClose() {
val latch = CountDownLatch(reconnects+1)
val reconnectCount = atomic(0)
run {
val configuration = serverConfig {
uniqueAeronDirectory = true
@ -114,15 +113,13 @@ class DisconnectReconnectTest : BaseTest() {
}
client.onDisconnect {
latch.countDown()
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
if (count < 3) {
if (count < reconnects) {
logger.error("Reconnecting: $count")
client.connect(LOCALHOST)
} else {
logger.error("Shutting down")
stopEndPoints()
}
}
@ -130,10 +127,12 @@ class DisconnectReconnectTest : BaseTest() {
}
latch.await()
stopEndPointsBlocking()
waitForThreads()
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
Assert.assertEquals(reconnects+1, reconnectCount.value)
}
interface CloseIface {
@ -155,6 +154,9 @@ class DisconnectReconnectTest : BaseTest() {
@Test
fun reconnectRmiClient() {
val latch = CountDownLatch(reconnects+1)
val reconnectCount = atomic(0)
val CLOSE_ID = 33
run {
@ -194,35 +196,32 @@ class DisconnectReconnectTest : BaseTest() {
}
client.onDisconnect {
latch.countDown()
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
if (count < reconnects) {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
client.connect(LOCALHOST)
}
}
client.connect(LOCALHOST)
}
latch.await()
stopEndPointsBlocking()
waitForThreads()
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
Assert.assertEquals(reconnects+1, reconnectCount.value)
}
@Test
fun manualMediaDriverAndReconnectClient() {
val latch = CountDownLatch(reconnects+1)
val reconnectCount = atomic(0)
val log = KotlinLogging.logger("DCUnitTest")
// NOTE: once a config is assigned to a driver, the config cannot be changed
val aeronDriver = runBlocking {
@ -254,20 +253,13 @@ class DisconnectReconnectTest : BaseTest() {
client.onDisconnect {
latch.countDown()
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
if (count < reconnects) {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
client.connect(LOCALHOST)
}
}
@ -275,17 +267,23 @@ class DisconnectReconnectTest : BaseTest() {
}
latch.await()
stopEndPointsBlocking()
waitForThreads()
runBlocking {
aeronDriver.close()
}
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
Assert.assertEquals(reconnects+1, reconnectCount.value)
}
@Test
fun reconnectWithFallbackClient() {
val latch = CountDownLatch(reconnects+1)
val reconnectCount = atomic(0)
// this tests IPC with fallback to UDP (because the server has IPC disabled, and the client has it enabled)
run {
val config = serverConfig()
@ -314,30 +312,24 @@ class DisconnectReconnectTest : BaseTest() {
client.onDisconnect {
logger.error("Disconnected!")
latch.countDown()
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Count reached, shutting down")
stopEndPoints()
}
else {
if (count < reconnects) {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
client.connect(LOCALHOST)
}
}
client.connect(LOCALHOST)
}
latch.await()
stopEndPointsBlocking()
waitForThreads()
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
Assert.assertEquals(reconnects+1, reconnectCount.value)
}
@Test
@ -378,7 +370,10 @@ class DisconnectReconnectTest : BaseTest() {
}
server.close()
runBlocking {
client.waitForClose()
server.waitForClose()
}
waitForThreads()
}

View File

@ -18,6 +18,7 @@ package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import kotlinx.coroutines.delay
import org.junit.Test
class ErrorLoggerTest : BaseTest() {
@ -68,6 +69,8 @@ class ErrorLoggerTest : BaseTest() {
client.onConnect {
// can be any message, we just want the error-log to log something
send(TestObj())
delay(200)
stopEndPoints()
}

View File

@ -17,30 +17,34 @@
package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.util.NamedThreadFactory
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Test
import java.text.SimpleDateFormat
import java.util.*
import java.util.concurrent.*
@Suppress("UNUSED_ANONYMOUS_PARAMETER")
class MultiClientTest : BaseTest() {
private val totalCount = 80
private val totalCount = 40
private val clientConnectCount = atomic(0)
private val serverConnectCount = atomic(0)
private val disconnectCount = atomic(0)
@OptIn(DelicateCoroutinesApi::class)
@OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
@Test
fun multiConnectClient() {
val server = run {
run {
val configuration = serverConfig()
configuration.enableIPv6 = false
configuration.uniqueAeronDirectory = true
@ -52,20 +56,9 @@ class MultiClientTest : BaseTest() {
logger.error("${this.id} - Connected $count ....")
close()
if (count == totalCount) {
logger.error { "Stopping endpoints!" }
// waiting just a few so that we can make sure that the handshake messages are properly sent
// if we DO NOT wait, what will happen is that the client will CLOSE before it receives the handshake HELLO_ACK
// delay(500)
stopEndPoints()
}
}
server.bind()
server
}
@ -74,6 +67,7 @@ class MultiClientTest : BaseTest() {
println()
println()
val shutdownLatch = CountDownLatch(totalCount)
// clients first, so they try to connect to the server at (roughly) the same time
@ -84,7 +78,7 @@ class MultiClientTest : BaseTest() {
config.uniqueAeronDirectory = true
val client: Client<Connection> = Client(config, "Client $i")
client.onConnect {
client.onInit {
val count = clientConnectCount.incrementAndGet()
logger.error("$id - Connected $count ($i)!")
}
@ -92,6 +86,7 @@ class MultiClientTest : BaseTest() {
client.onDisconnect {
val count = disconnectCount.incrementAndGet()
logger.error("$id - Disconnected $count ($i)!")
shutdownLatch.countDown()
}
addEndPoint(client)
@ -101,24 +96,25 @@ class MultiClientTest : BaseTest() {
// start up the drivers first
runBlocking {
clients.forEach {
println("******************")
it.startDriver()
println("******************")
}
}
// if we are on the same JVM, the defaultScope for coroutines is SHARED, and limited!
val differentThreadLaunchers = Executors.newFixedThreadPool(8,
NamedThreadFactory("Unit Test Client", Configuration.networkThreadGroup, true)
).asCoroutineDispatcher()
clients.forEach {
GlobalScope.launch {
// long connection timeout, since the more that try to connect at the same time, the longer it takes to setup aeron (since it's all shared)
it.connect(LOCALHOST, 30)
runBlocking {
clients.forEach {
launch(differentThreadLaunchers) {
it.connect(LOCALHOST, 30)
}
}
}
waitForThreads(totalCount*AUTO_FAIL_TIMEOUT) {
outputStats(server)
}
shutdownLatch.await()
stopEndPointsBlocking()
Assert.assertEquals(totalCount, clientConnectCount.value)
Assert.assertEquals(totalCount, serverConnectCount.value)

View File

@ -39,7 +39,9 @@ import dorkbox.network.Client
import dorkbox.network.Configuration
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.exceptions.ServerException
import dorkbox.util.exceptions.SecurityException
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
@ -49,24 +51,22 @@ import java.io.IOException
import java.util.concurrent.atomic.*
class MultipleServerTest : BaseTest() {
val total = 1
var received = AtomicInteger()
private val total = 4
@Test
@Throws(SecurityException::class, IOException::class)
fun multipleUDP() {
val portOffset = 2
received.set(0)
val received = AtomicInteger(0)
var serverAeronDir: File? = null
val didReceive = mutableListOf<AtomicBoolean>()
for (count in 0 until total) {
didReceive.add(AtomicBoolean())
val offset = count * portOffset
val configuration = serverConfig()
configuration.port += offset
configuration.port += count
configuration.aeronDirectory = serverAeronDir
configuration.enableIpc = false
@ -95,10 +95,8 @@ class MultipleServerTest : BaseTest() {
for (count in 0 until total) {
didSend.add(AtomicBoolean())
val offset = count * portOffset
val configuration = clientConfig()
configuration.port += offset
configuration.port += count
configuration.aeronDirectory = clientAeronDir
configuration.enableIpc = false
@ -126,10 +124,35 @@ class MultipleServerTest : BaseTest() {
}
}
@Test(expected = ServerException::class)
@Throws(SecurityException::class, IOException::class)
fun multipleInvalidIPC() {
val servers = mutableListOf<Server<Connection>>()
try {
for (count in 0 until total) {
val configuration = serverConfig()
configuration.enableIPv4 = true
configuration.enableIPv6 = true
configuration.enableIpc = true
val server: Server<Connection>?
servers.add(Server(configuration, "server_$count"))
}
} catch (e: Exception) {
runBlocking {
servers.forEach {
it.close()
it.waitForClose()
}
}
throw e
}
}
@Test
@Throws(SecurityException::class, IOException::class)
fun multipleIPC() {
received.set(0)
val received = AtomicInteger(0)
// client and server must share locations
val aeronDirs = mutableListOf<File>()
@ -145,7 +168,9 @@ class MultipleServerTest : BaseTest() {
didReceive.add(AtomicBoolean())
val configuration = serverConfig()
configuration.aeronDirectory = aeronDirs.get(count)
configuration.aeronDirectory = aeronDirs[count]
configuration.enableIPv4 = false
configuration.enableIPv6 = false
configuration.enableIpc = true
val server: Server<Connection> = Server(configuration, "server_$count")
@ -178,7 +203,9 @@ class MultipleServerTest : BaseTest() {
didSend.add(AtomicBoolean())
val configuration = clientConfig()
configuration.aeronDirectory = aeronDirs.get(count)
configuration.aeronDirectory = aeronDirs[count]
configuration.enableIPv4 = false
configuration.enableIPv6 = false
configuration.enableIpc = true
val client: Client<Connection> = Client(configuration, "client_$count")

View File

@ -27,6 +27,7 @@ class PingTest : BaseTest() {
val counter = atomic(0)
@Test
fun RmiPing() {
// session/stream count errors
val clientSuccess = atomic(false)
run {

View File

@ -28,6 +28,7 @@ import org.junit.Test
class RoundTripMessageTest : BaseTest() {
@Test
fun MessagePing() {
// session/stream count errors
val serverSuccess = atomic(false)
val clientSuccess = atomic(false)

View File

@ -25,6 +25,7 @@ import org.junit.Test
class SerializationValidationTest : BaseTest() {
@Test
fun checkManyObjects() {
// session/stream count errors
run {
val configuration = serverConfig()
register(configuration.serialization)
@ -57,6 +58,7 @@ class SerializationValidationTest : BaseTest() {
@Test
fun checkOutOfOrder() {
// session/stream count errors
run {
val configuration = serverConfig()
configuration.serialization.rmi.register(TestObject::class.java, TestObjectImpl::class.java)

View File

@ -28,8 +28,8 @@ import java.io.IOException
import java.util.concurrent.atomic.*
class SimpleTest : BaseTest() {
var received = AtomicBoolean()
val sent = AtomicBoolean()
private var received = AtomicBoolean()
private val sent = AtomicBoolean()
enum class ConnectType(val ip4: Boolean, val ip6: Boolean, val ipc: Boolean) {
IPC(false, false, true),
@ -182,7 +182,7 @@ class SimpleTest : BaseTest() {
received.set(true)
logger.error("Done, stopping endpoints")
// this must NOT be on the disconenct thread, because we cancel it!
// this must NOT be on the disconnect thread, because we cancel it!
stopEndPoints()
}

View File

@ -28,7 +28,9 @@ class StreamingTest : BaseTest() {
@Test
fun sendStreamingObject() {
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 8
// TODO: streaming data is NOT saved to temp files, it is in memory. every 16 megs should be flushed to disk (this is arbitrary and should be a config setting). if this number is too
// high, we will run out of memory
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 16
val hugeData = ByteArray(sizeToTest)
SecureRandom().nextBytes(hugeData)

View File

@ -47,6 +47,7 @@ import dorkboxTest.network.rmi.cows.TestCow
import dorkboxTest.network.rmi.cows.TestCowImpl
import org.junit.Assert
import org.junit.Test
import java.util.concurrent.*
class RmiDuplicateObjectTest : BaseTest() {
@Test
@ -83,13 +84,15 @@ class RmiDuplicateObjectTest : BaseTest() {
isIpv4 && isIpv6 && !runIpv4Connect -> client.connect(IPv6.LOCALHOST)
isIpv4 -> client.connect(IPv4.LOCALHOST)
isIpv6 -> client.connect(IPv6.LOCALHOST)
else -> client.connect()
else -> client.connect(IPv4.LOCALHOST)
}
}
private val objs = mutableSetOf<Int>()
fun rmi(isIpv4: Boolean = false, isIpv6: Boolean = false, runIpv4Connect: Boolean = true, config: Configuration.() -> Unit = {}) {
val latch = CountDownLatch(2)
run {
val configuration = serverConfig()
configuration.enableIPv4 = isIpv4
@ -106,6 +109,7 @@ class RmiDuplicateObjectTest : BaseTest() {
server.bind()
server.onConnect {
// these are on separate threads (client.init) and this -- there can be race conditions, where the object doesn't exist yet!
server.forEachConnection {
val testCow = it.rmi.get<TestCow>(4)
testCow.moo()
@ -113,6 +117,7 @@ class RmiDuplicateObjectTest : BaseTest() {
synchronized(objs) {
objs.add(testCow.id())
}
latch.countDown()
}
}
}
@ -145,8 +150,10 @@ class RmiDuplicateObjectTest : BaseTest() {
doConnect(isIpv4, isIpv6, runIpv4Connect, client)
}
waitForThreads(5)
latch.await()
stopEndPointsBlocking()
waitForThreads()
val actual = synchronized(objs) {
objs.joinToString()

View File

@ -31,7 +31,6 @@ import org.junit.Test
class RmiSimpleActionsTest : BaseTest() {
@Test
fun testGlobalDelete() {
val configuration = serverConfig()
configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java)
configuration.serialization.register(MessageWithTestCow::class.java)
@ -63,6 +62,8 @@ class RmiSimpleActionsTest : BaseTest() {
Assert.assertTrue(server.rmiGlobal.delete(testCowImpl))
Assert.assertFalse(server.rmiGlobal.delete(testCowImpl))
Assert.assertFalse(server.rmiGlobal.delete(newId2))
stopEndPointsBlocking()
}
@Test

View File

@ -22,14 +22,10 @@ import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.RemoteObject
import dorkboxTest.network.BaseTest
import kotlinx.coroutines.delay
import org.junit.Assert
import org.junit.Test
import java.util.concurrent.atomic.*
import java.util.concurrent.*
class RmiSpamAsyncTest : BaseTest() {
private val counter = AtomicLong(0)
private val RMI_ID = 12251
@ -49,11 +45,12 @@ class RmiSpamAsyncTest : BaseTest() {
* In this test the server has two objects in an object space. The client
* uses the first remote object to get the second remote object.
*/
fun rmi(config: Configuration.() -> Unit = {}) {
private fun rmi(config: Configuration.() -> Unit = {}) {
val server: Server<Connection>
val mod = 100_000L
val totalRuns = 1_000_000L
val totalRuns = 1_000_000
val latch = CountDownLatch(totalRuns)
run {
val configuration = serverConfig()
@ -67,7 +64,7 @@ class RmiSpamAsyncTest : BaseTest() {
server = Server(configuration)
addEndPoint(server)
server.rmiGlobal.save(TestObjectImpl(counter), RMI_ID)
server.rmiGlobal.save(TestObjectImpl(latch), RMI_ID)
server.bind()
}
@ -107,37 +104,25 @@ class RmiSpamAsyncTest : BaseTest() {
e.printStackTrace()
}
}
// The async nature means that we don't know EXACTLY when all the messages will arrive. For testing, this is the closest
// we can do to attempt to have a correct info lookup.
var count = 0
while (counter.get() < totalRuns && count < 30) {
logger.error("Waiting for ${totalRuns - counter.get()} more messages...")
count++
delay(1_000)
}
// have to do this first, so it will wait for the client responses!
// if we close the client first, the connection will be closed, and the responses will never arrive to the server
stopEndPoints()
}
client.connect(LOCALHOST)
}
latch.await()
stopEndPointsBlocking()
waitForThreads()
Assert.assertEquals(totalRuns, counter.get())
}
private interface TestObject {
fun setOther(value: Long): Boolean
fun setOther(value: Int): Boolean
}
private class TestObjectImpl(private val counter: AtomicLong) : TestObject {
private class TestObjectImpl(private val latch: CountDownLatch) : TestObject {
@Override
override fun setOther(value: Long): Boolean {
counter.getAndIncrement()
override fun setOther(value: Int): Boolean {
latch.countDown()
return true
}
}

View File

@ -54,7 +54,7 @@ class RmiSpamSyncSuspendingTest : BaseTest() {
* In this test the server has two objects in an object space. The client
* uses the first remote object to get the second remote object.
*/
fun rmi(config: Configuration.() -> Unit = {}) {
private fun rmi(config: Configuration.() -> Unit = {}) {
val server: Server<Connection>
val mod = 400L

View File

@ -53,7 +53,7 @@ class RmiSpamSyncTest : BaseTest() {
* In this test the server has two objects in an object space. The client
* uses the first remote object to get the second remote object.
*/
fun rmi(config: Configuration.() -> Unit = {}) {
private fun rmi(config: Configuration.() -> Unit = {}) {
val server: Server<Connection>
val mod = 400L