Network/test/dorkboxTest/network/DisconnectReconnectTest.kt

372 lines
9.9 KiB
Kotlin
Raw Normal View History

package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.RemoteObject
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
2023-02-27 11:28:24 +01:00
import mu.KotlinLogging
import org.junit.Assert
import org.junit.Test
import java.io.IOException
class DisconnectReconnectTest : BaseTest() {
private val reconnectCount = atomic(0)
@Test
fun reconnectClient() {
run {
val configuration = serverConfig()
val server: Server<Connection> = Server(configuration)
addEndPoint(server)
2020-09-02 03:18:10 +02:00
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
delay(2000)
logger.error("Disconnecting....")
close()
}
}
run {
val config = clientConfig()
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onDisconnect {
logger.error("Disconnected!")
2020-08-27 02:34:36 +02:00
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
}
}
client.connect(LOCALHOST)
2021-04-30 18:22:38 +02:00
}
waitForThreads()
2021-04-30 18:22:38 +02:00
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
@Test
fun reconnectClientViaClientClose() {
run {
val configuration = serverConfig {
uniqueAeronDirectory = true
}
val server: Server<Connection> = Server(configuration)
addEndPoint(server)
server.bind()
}
run {
val config = clientConfig() {
uniqueAeronDirectory = true
}
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onConnect {
logger.error("Disconnecting after 2 seconds.")
delay(2000)
logger.error("Disconnecting....")
client.close()
}
client.onDisconnect {
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
}
}
client.connect(LOCALHOST)
}
waitForThreads()
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
2021-04-30 18:22:38 +02:00
interface CloseIface {
fun close()
2021-04-30 18:22:38 +02:00
}
class CloseImpl : CloseIface {
override fun close() {
2021-04-30 18:22:38 +02:00
// the connection specific one is called instead
}
fun close(connection: Connection) {
connection.logger.error { "PRE CLOSE MESSAGE!" }
2021-04-30 18:22:38 +02:00
connection.close()
}
}
@Test
fun reconnectRmiClient() {
val CLOSE_ID = 33
run {
val config = serverConfig()
2021-07-02 11:40:20 +02:00
config.serialization.rmi.register(CloseIface::class.java)
2021-04-30 18:22:38 +02:00
val server: Server<Connection> = Server(config)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
2021-04-30 18:22:38 +02:00
delay(2000)
logger.error("Disconnecting via RMI ....")
val closerObject = rmi.get<CloseIface>(CLOSE_ID)
// the close operation will kill the connection, preventing the response from returning.
2023-02-27 11:28:24 +01:00
RemoteObject.cast(closerObject).async = true
2021-04-30 18:22:38 +02:00
closerObject.close()
}
}
run {
val config = clientConfig()
2021-07-02 11:40:20 +02:00
config.serialization.rmi.register(CloseIface::class.java, CloseImpl::class.java)
2021-04-30 18:22:38 +02:00
val client: Client<Connection> = Client(config)
addEndPoint(client)
2021-07-02 11:40:20 +02:00
client.onInit {
2021-07-02 11:40:20 +02:00
rmi.save(CloseImpl(), CLOSE_ID)
}
2021-04-30 18:22:38 +02:00
client.onDisconnect {
logger.error("Disconnected!")
2021-04-30 18:22:38 +02:00
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
2021-04-30 18:22:38 +02:00
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
2021-04-30 18:22:38 +02:00
try {
client.connect(LOCALHOST)
2021-04-30 18:22:38 +02:00
} catch (e: IOException) {
e.printStackTrace()
}
}
}
2021-04-30 18:22:38 +02:00
client.connect(LOCALHOST)
}
waitForThreads(AUTO_FAIL_TIMEOUT*10)
2020-09-10 14:40:09 +02:00
2022-07-14 04:54:50 +02:00
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
2020-09-10 14:40:09 +02:00
Assert.assertEquals(4, reconnectCount.value)
}
@Test
fun manualMediaDriverAndReconnectClient() {
2023-02-27 11:28:24 +01:00
val log = KotlinLogging.logger("DCUnitTest")
// NOTE: once a config is assigned to a driver, the config cannot be changed
2023-02-27 11:28:24 +01:00
val aeronDriver = AeronDriver.getDriver(serverConfig(), log)
runBlocking {
2023-02-27 11:28:24 +01:00
aeronDriver.start(log)
}
run {
val serverConfiguration = serverConfig()
val server: Server<Connection> = Server(serverConfiguration)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
delay(2000)
logger.error("Disconnecting....")
close()
}
}
run {
val config = clientConfig()
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onDisconnect {
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
}
}
client.connect(LOCALHOST)
}
waitForThreads()
runBlocking {
aeronDriver.close()
}
2022-07-14 04:54:50 +02:00
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
2020-09-10 14:40:09 +02:00
@Test
fun reconnectWithFallbackClient() {
2021-04-30 18:22:38 +02:00
// this tests IPC with fallback to UDP (because the server has IPC disabled, and the client has it enabled)
2020-09-10 14:40:09 +02:00
run {
val config = serverConfig()
config.enableIpc = false
val server: Server<Connection> = Server(config)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
2020-09-10 14:40:09 +02:00
delay(2000)
logger.error("Disconnecting....")
close()
2020-09-10 14:40:09 +02:00
}
}
run {
val config = clientConfig()
config.enableIpc = true
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onDisconnect {
logger.error("Disconnected!")
2020-09-10 14:40:09 +02:00
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
2020-09-10 14:40:09 +02:00
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
2020-09-10 14:40:09 +02:00
try {
client.connect(LOCALHOST)
2020-09-10 14:40:09 +02:00
} catch (e: IOException) {
e.printStackTrace()
}
}
}
client.connect(LOCALHOST)
2020-09-10 14:40:09 +02:00
}
waitForThreads()
2022-07-14 04:54:50 +02:00
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
@Test
fun disconnectedMediaDriver() {
val server: Server<Connection>
run {
val config = serverConfig()
config.enableIpc = false
config.uniqueAeronDirectory = true
server = Server(config)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Connected!")
}
}
val client: Client<Connection>
run {
val config = clientConfig()
config.enableIpc = false
config.uniqueAeronDirectory = true
client = Client(config)
addEndPoint(client)
client.onConnect {
logger.error("Connected!")
}
client.onDisconnect {
stopEndPoints()
}
client.connect(LOCALHOST)
}
server.close()
waitForThreads()
}
}