Network/test/dorkboxTest/network/DisconnectReconnectTest.kt

386 lines
10 KiB
Kotlin
Raw Normal View History

2023-03-17 15:00:00 +01:00
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.RemoteObject
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
2023-02-27 11:28:24 +01:00
import mu.KotlinLogging
import org.junit.Assert
import org.junit.Test
import java.io.IOException
class DisconnectReconnectTest : BaseTest() {
private val reconnectCount = atomic(0)
@Test
fun reconnectClient() {
run {
val configuration = serverConfig()
val server: Server<Connection> = Server(configuration)
addEndPoint(server)
2020-09-02 03:18:10 +02:00
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
delay(2000)
logger.error("Disconnecting....")
close()
}
}
run {
val config = clientConfig()
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onDisconnect {
logger.error("Disconnected!")
2020-08-27 02:34:36 +02:00
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
}
}
client.connect(LOCALHOST)
2021-04-30 18:22:38 +02:00
}
waitForThreads()
2021-04-30 18:22:38 +02:00
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
@Test
fun reconnectClientViaClientClose() {
run {
val configuration = serverConfig {
uniqueAeronDirectory = true
}
val server: Server<Connection> = Server(configuration)
addEndPoint(server)
server.bind()
}
run {
2023-03-17 15:05:59 +01:00
val config = clientConfig {
uniqueAeronDirectory = true
}
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onConnect {
logger.error("Disconnecting after 2 seconds.")
delay(2000)
logger.error("Disconnecting....")
2023-05-28 18:41:46 +02:00
close()
}
client.onDisconnect {
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
2023-03-17 15:05:59 +01:00
if (count < 3) {
logger.error("Reconnecting: $count")
client.connect(LOCALHOST)
} else {
logger.error("Shutting down")
stopEndPoints()
}
}
client.connect(LOCALHOST)
}
waitForThreads()
System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
2021-04-30 18:22:38 +02:00
interface CloseIface {
fun close()
2021-04-30 18:22:38 +02:00
}
class CloseImpl : CloseIface {
override fun close() {
2021-04-30 18:22:38 +02:00
// the connection specific one is called instead
}
fun close(connection: Connection) {
2023-03-17 15:05:59 +01:00
runBlocking {
connection.close()
}
2021-04-30 18:22:38 +02:00
}
}
@Test
fun reconnectRmiClient() {
val CLOSE_ID = 33
run {
val config = serverConfig()
2021-07-02 11:40:20 +02:00
config.serialization.rmi.register(CloseIface::class.java)
2021-04-30 18:22:38 +02:00
val server: Server<Connection> = Server(config)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
2021-04-30 18:22:38 +02:00
delay(2000)
logger.error("Disconnecting via RMI ....")
val closerObject = rmi.get<CloseIface>(CLOSE_ID)
// the close operation will kill the connection, preventing the response from returning.
2023-02-27 11:28:24 +01:00
RemoteObject.cast(closerObject).async = true
2023-05-28 18:41:46 +02:00
// this just calls connection.close() (on the client)
2021-04-30 18:22:38 +02:00
closerObject.close()
}
}
run {
val config = clientConfig()
2021-07-02 11:40:20 +02:00
config.serialization.rmi.register(CloseIface::class.java, CloseImpl::class.java)
2021-04-30 18:22:38 +02:00
val client: Client<Connection> = Client(config)
addEndPoint(client)
2021-07-02 11:40:20 +02:00
client.onInit {
2021-07-02 11:40:20 +02:00
rmi.save(CloseImpl(), CLOSE_ID)
}
2021-04-30 18:22:38 +02:00
client.onDisconnect {
logger.error("Disconnected!")
2021-04-30 18:22:38 +02:00
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
2021-04-30 18:22:38 +02:00
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
2021-04-30 18:22:38 +02:00
try {
client.connect(LOCALHOST)
2021-04-30 18:22:38 +02:00
} catch (e: IOException) {
e.printStackTrace()
}
}
}
2021-04-30 18:22:38 +02:00
client.connect(LOCALHOST)
}
waitForThreads()
2020-09-10 14:40:09 +02:00
2022-07-14 04:54:50 +02:00
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
2020-09-10 14:40:09 +02:00
Assert.assertEquals(4, reconnectCount.value)
}
@Test
fun manualMediaDriverAndReconnectClient() {
2023-02-27 11:28:24 +01:00
val log = KotlinLogging.logger("DCUnitTest")
// NOTE: once a config is assigned to a driver, the config cannot be changed
2023-03-17 15:05:59 +01:00
val aeronDriver = runBlocking {
val driver = AeronDriver(serverConfig(), log)
driver.start()
driver
}
run {
val serverConfiguration = serverConfig()
val server: Server<Connection> = Server(serverConfiguration)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
delay(2000)
logger.error("Disconnecting....")
close()
}
}
run {
val config = clientConfig()
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onDisconnect {
logger.error("Disconnected!")
val count = reconnectCount.getAndIncrement()
if (count == 3) {
logger.error("Shutting down")
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
try {
client.connect(LOCALHOST)
} catch (e: IOException) {
e.printStackTrace()
}
}
}
client.connect(LOCALHOST)
}
waitForThreads()
runBlocking {
aeronDriver.close()
}
2022-07-14 04:54:50 +02:00
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
2020-09-10 14:40:09 +02:00
@Test
fun reconnectWithFallbackClient() {
2021-04-30 18:22:38 +02:00
// this tests IPC with fallback to UDP (because the server has IPC disabled, and the client has it enabled)
2020-09-10 14:40:09 +02:00
run {
val config = serverConfig()
config.enableIpc = false
val server: Server<Connection> = Server(config)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Disconnecting after 2 seconds.")
2020-09-10 14:40:09 +02:00
delay(2000)
logger.error("Disconnecting....")
close()
2020-09-10 14:40:09 +02:00
}
}
run {
val config = clientConfig()
config.enableIpc = true
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onDisconnect {
logger.error("Disconnected!")
2020-09-10 14:40:09 +02:00
val count = reconnectCount.getAndIncrement()
if (count == 3) {
2023-05-28 18:41:46 +02:00
logger.error("Count reached, shutting down")
2020-09-10 14:40:09 +02:00
stopEndPoints()
}
else {
logger.error("Reconnecting: $count")
2020-09-10 14:40:09 +02:00
try {
client.connect(LOCALHOST)
2020-09-10 14:40:09 +02:00
} catch (e: IOException) {
e.printStackTrace()
}
}
}
client.connect(LOCALHOST)
2020-09-10 14:40:09 +02:00
}
waitForThreads()
2022-07-14 04:54:50 +02:00
//System.err.println("Connection count (after reconnecting) is: " + reconnectCount.value)
Assert.assertEquals(4, reconnectCount.value)
}
@Test
fun disconnectedMediaDriver() {
val server: Server<Connection>
run {
val config = serverConfig()
config.enableIpc = false
config.uniqueAeronDirectory = true
server = Server(config)
addEndPoint(server)
server.bind()
server.onConnect {
logger.error("Connected!")
}
}
val client: Client<Connection>
run {
val config = clientConfig()
config.enableIpc = false
config.uniqueAeronDirectory = true
client = Client(config)
addEndPoint(client)
client.onConnect {
logger.error("Connected!")
}
client.onDisconnect {
stopEndPoints()
}
client.connect(LOCALHOST)
}
server.close()
waitForThreads()
}
}