Merge branch 'session'

This commit is contained in:
Robinson 2023-09-17 02:37:36 +02:00
commit b2d349d17c
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
15 changed files with 357 additions and 185 deletions

View File

@ -45,74 +45,11 @@ import java.util.concurrent.*
* ASYNC.
*
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
* @param loggerName allows for a custom logger name for this endpoint (for when there are multiple endpoints)
*/
@Suppress("unused")
open class Client<CONNECTION : Connection>(
config: ClientConfiguration = ClientConfiguration(),
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
loggerName: String = Client::class.java.simpleName)
: EndPoint<CONNECTION>(config, connectionFunc, loggerName) {
/**
* The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
* ASYNC.
*
* @param config these are the specific connection options
* @param loggerName allows for a custom logger name for this endpoint (for when there are multiple endpoints)
* @param connectionFunc allows for custom connection implementations defined as a unit function
*/
constructor(config: ClientConfiguration,
loggerName: String,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION)
: this(config, connectionFunc, loggerName)
/**
* The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
* ASYNC.
*
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
*/
constructor(config: ClientConfiguration,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION)
: this(config, connectionFunc, Client::class.java.simpleName)
/**
* The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
* ASYNC.
*
* @param config these are the specific connection options
* @param loggerName allows for a custom logger name for this endpoint (for when there are multiple endpoints)
*/
constructor(config: ClientConfiguration,
loggerName: String)
: this(config,
{
@Suppress("UNCHECKED_CAST")
Connection(it) as CONNECTION
},
loggerName)
/**
* The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
* ASYNC.
*
* @param config these are the specific connection options
*/
constructor(config: ClientConfiguration)
: this(config,
{
@Suppress("UNCHECKED_CAST")
Connection(it) as CONNECTION
},
Client::class.java.simpleName)
open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientConfiguration(), loggerName: String = Client::class.java.simpleName)
: EndPoint<CONNECTION>(config, loggerName) {
companion object {
/**
@ -810,7 +747,7 @@ open class Client<CONNECTION : Connection>(
logger.info("Creating new connection to $logInfo")
}
val newConnection = connectionFunc(ConnectionParams(
val newConnection = newConnection(ConnectionParams(
connectionInfo.publicKey,
this,
clientConnection.connectionInfo,

View File

@ -39,76 +39,8 @@ import java.util.concurrent.*
* @param connectionFunc allows for custom connection implementations defined as a unit function
* @param loggerName allows for a custom logger name for this endpoint (for when there are multiple endpoints)
*/
open class Server<CONNECTION : Connection>(
config: ServerConfiguration = ServerConfiguration(),
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
loggerName: String = Server::class.java.simpleName)
: EndPoint<CONNECTION>(config, connectionFunc, loggerName) {
/**
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
* server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections())
*
* To put it bluntly, ONLY have the server do work inside a listener!
*
* @param config these are the specific connection options
* @param loggerName allows for a custom logger name for this endpoint (for when there are multiple endpoints)
* @param connectionFunc allows for custom connection implementations defined as a unit function
*/
constructor(config: ServerConfiguration,
loggerName: String,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION)
: this(config, connectionFunc, loggerName)
/**
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
* server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections())
*
* To put it bluntly, ONLY have the server do work inside of a listener!
*
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
*/
constructor(config: ServerConfiguration,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION)
: this(config, connectionFunc, Server::class.java.simpleName)
/**
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
* server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections())
*
* To put it bluntly, ONLY have the server do work inside a listener!
*
* @param config these are the specific connection options
* @param loggerName allows for a custom logger name for this endpoint (for when there are multiple endpoints)
*/
constructor(config: ServerConfiguration,
loggerName: String = Server::class.java.simpleName)
: this(config,
{
@Suppress("UNCHECKED_CAST")
Connection(it) as CONNECTION
},
loggerName)
/**
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
* server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections())
*
* To put it bluntly, ONLY have the server do work inside a listener!
*
* @param config these are the specific connection options
*/
constructor(config: ServerConfiguration)
: this(config,
{
@Suppress("UNCHECKED_CAST")
Connection(it) as CONNECTION
},
Server::class.java.simpleName)
open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerConfiguration(), loggerName: String = Server::class.java.simpleName)
: EndPoint<CONNECTION>(config, loggerName) {
companion object {
/**

View File

@ -0,0 +1,33 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
class SessionClient<CONNECTION: Connection>(
config: ClientConfiguration = ClientConfiguration(),
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
loggerName: String = Client::class.java.simpleName
) : Client<CONNECTION>(config, connectionFunc, loggerName) {
val decideConnection: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION = {
@Suppress("UNCHECKED_CAST")
Connection(it) as CONNECTION
}
}

View File

@ -73,21 +73,15 @@ import java.util.concurrent.*
*
* @throws SecurityException if unable to initialize/generate ECC keys
*/
abstract class EndPoint<CONNECTION : Connection> private constructor(val type: Class<*>,
val config: Configuration,
internal val connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
loggerName: String)
{
abstract class EndPoint<CONNECTION : Connection> private constructor(val type: Class<*>, val config: Configuration, loggerName: String) {
protected constructor(config: Configuration,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
loggerName: String)
: this(Client::class.java, config, connectionFunc, loggerName)
: this(Client::class.java, config, loggerName)
protected constructor(config: ServerConfiguration,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
loggerName: String)
: this(Server::class.java, config, connectionFunc, loggerName)
: this(Server::class.java, config, loggerName)
companion object {
// connections are extremely difficult to diagnose when the connection timeout is short
@ -394,6 +388,14 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
}
}
/**
* This is called whenever a new connection is made. By overriding this, it is possible to customize the Connection type.
*/
open fun newConnection(connectionParameters: ConnectionParams<CONNECTION>): CONNECTION {
@Suppress("UNCHECKED_CAST")
return Connection(connectionParameters) as CONNECTION
}
abstract fun newException(message: String, cause: Throwable? = null): Throwable
// used internally to remove a connection. Will also remove all proxy objects

View File

@ -0,0 +1,30 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.session
/**
* A session will persist across stop/reconnect/etc, as long as the client/server ID remains the same.
*
* If the client/server are closed for longer than XXX seconds, then the connection will be considered closed, and the session will be cleared.
*
* The sessionId is the public key of the remote endpoint.
*/
class Session(val sessionId: ByteArray) {
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.session
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
class SessionConnection(connectionParameters: ConnectionParams<*>): Connection(connectionParameters) {
@Volatile
var session: Session? = null
fun reconnect() {
}
}

View File

@ -230,7 +230,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
publicKey: ByteArray,
message: HandshakeMessage,
logInfo: String,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
logger: Logger
): Boolean {
val serialization = config.serialization
@ -347,7 +346,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
logger.info("Creating new connection to $logInfo")
}
connection = connectionFunc(ConnectionParams(
connection = server.newConnection(ConnectionParams(
publicKey,
server,
newConnectionDriver.pubSub,
@ -422,7 +421,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
isReliable: Boolean,
message: HandshakeMessage,
logInfo: String,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
logger: Logger
): Boolean {
val serialization = config.serialization
@ -576,7 +574,7 @@ internal class ServerHandshake<CONNECTION : Connection>(
logger.info("Creating new connection to $logInfo")
}
connection = connectionFunc(ConnectionParams(publicKey, server, newConnectionDriver.pubSub, validateRemoteAddress, cryptoSecretKey))
connection = server.newConnection(ConnectionParams(publicKey, server, newConnectionDriver.pubSub, validateRemoteAddress, cryptoSecretKey))
// VALIDATE:: are we allowed to connect to this server (now that we have the initial server information)
val permitConnection = listenerManager.notifyFilter(connection)

View File

@ -26,7 +26,6 @@ import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.AeronDriver.Companion.uriHandshake
import dorkbox.network.aeron.AeronPoller
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EventDispatcher
import dorkbox.network.connection.IpInfo
import dorkbox.network.exceptions.ServerException
@ -60,8 +59,7 @@ internal object ServerHandshakePollers {
val logger: Logger,
val server: Server<CONNECTION>,
val driver: AeronDriver,
val handshake: ServerHandshake<CONNECTION>,
val connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION
val handshake: ServerHandshake<CONNECTION>
): FragmentHandler {
private val isReliable = server.config.isReliable
@ -163,7 +161,6 @@ internal object ServerHandshakePollers {
publicKey = message.publicKey!!,
message = message,
logInfo = logInfo,
connectionFunc = connectionFunc,
logger = logger
)
@ -229,7 +226,6 @@ internal object ServerHandshakePollers {
val server: Server<CONNECTION>,
val driver: AeronDriver,
val handshake: ServerHandshake<CONNECTION>,
val connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
val isReliable: Boolean
): FragmentHandler {
companion object {
@ -392,7 +388,6 @@ internal object ServerHandshakePollers {
isReliable = isReliable,
message = message,
logInfo = logInfo,
connectionFunc = connectionFunc,
logger = logger
)
@ -460,7 +455,6 @@ internal object ServerHandshakePollers {
fun <CONNECTION : Connection> ipc(server: Server<CONNECTION>, handshake: ServerHandshake<CONNECTION>): AeronPoller {
val logger = server.logger
val connectionFunc = server.connectionFunc
val config = server.config as ServerConfiguration
val poller = try {
@ -481,7 +475,7 @@ internal object ServerHandshakePollers {
// - re-entrant with the client
val subscription = driver.subscription
val delegate = IpcProc(logger, server, server.aeronDriver, handshake, connectionFunc)
val delegate = IpcProc(logger, server, server.aeronDriver, handshake)
val handler = FragmentAssembler(delegate)
override fun poll(): Int {
@ -509,7 +503,6 @@ internal object ServerHandshakePollers {
fun <CONNECTION : Connection> ip4(server: Server<CONNECTION>, handshake: ServerHandshake<CONNECTION>): AeronPoller {
val logger = server.logger
val connectionFunc = server.connectionFunc
val config = server.config
val isReliable = config.isReliable
@ -531,7 +524,7 @@ internal object ServerHandshakePollers {
// - re-entrant with the client
val subscription = driver.subscription
val delegate = UdpProc(logger, server, server.aeronDriver, handshake, connectionFunc, isReliable)
val delegate = UdpProc(logger, server, server.aeronDriver, handshake, isReliable)
val handler = FragmentAssembler(delegate)
override fun poll(): Int {
@ -557,7 +550,6 @@ internal object ServerHandshakePollers {
fun <CONNECTION : Connection> ip6(server: Server<CONNECTION>, handshake: ServerHandshake<CONNECTION>): AeronPoller {
val logger = server.logger
val connectionFunc = server.connectionFunc
val config = server.config
val isReliable = config.isReliable
@ -579,7 +571,7 @@ internal object ServerHandshakePollers {
// - re-entrant with the client
val subscription = driver.subscription
val delegate = UdpProc(logger, server, server.aeronDriver, handshake, connectionFunc, isReliable)
val delegate = UdpProc(logger, server, server.aeronDriver, handshake, isReliable)
val handler = FragmentAssembler(delegate)
override fun poll(): Int {
@ -606,7 +598,6 @@ internal object ServerHandshakePollers {
fun <CONNECTION : Connection> ip6Wildcard(server: Server<CONNECTION>, handshake: ServerHandshake<CONNECTION>): AeronPoller {
val logger = server.logger
val connectionFunc = server.connectionFunc
val config = server.config
val isReliable = config.isReliable
@ -628,7 +619,7 @@ internal object ServerHandshakePollers {
// - re-entrant with the client
val subscription = driver.subscription
val delegate = UdpProc(logger, server, server.aeronDriver, handshake, connectionFunc, isReliable)
val delegate = UdpProc(logger, server, server.aeronDriver, handshake, isReliable)
val handler = FragmentAssembler(delegate)
override fun poll(): Int {

View File

@ -0,0 +1,24 @@
/*
* Copyright 2021 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.dns;
/**
* Required for intellij to not complain regarding `module-info` for a multi-release jar.
* This file is completely ignored by the gradle build process
*/
public
class EmptyClass {}

View File

@ -3,6 +3,7 @@ module dorkbox.network {
exports dorkbox.network.aeron;
exports dorkbox.network.connection;
exports dorkbox.network.connection.streaming;
exports dorkbox.network.connection.cache;
exports dorkbox.network.connectionType;
exports dorkbox.network.exceptions;
exports dorkbox.network.handshake;

View File

@ -74,9 +74,12 @@ class ListenerTest : BaseTest() {
@Test
@Throws(SecurityException::class, InitializationException::class, IOException::class, InterruptedException::class)
fun listener() {
val server: Server<TestConnectionA> = Server(serverConfig()) {
TestConnectionA(it)
val server = object : Server<TestConnectionA>(serverConfig()) {
override fun newConnection(connectionParameters: ConnectionParams<TestConnectionA>): TestConnectionA {
return TestConnectionA(connectionParameters)
}
}
addEndPoint(server)
// has session/stream count errors!
@ -121,9 +124,12 @@ class ListenerTest : BaseTest() {
// ----
val client: Client<TestConnectionA> = Client(clientConfig()) {
TestConnectionA(it)
val client = object : Client<TestConnectionA>(clientConfig()) {
override fun newConnection(connectionParameters: ConnectionParams<TestConnectionA>): TestConnectionA {
return TestConnectionA(connectionParameters)
}
}
addEndPoint(client)

View File

@ -0,0 +1,111 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkboxTest.network.rmi.RmiCommonTest
import dorkboxTest.network.rmi.cows.MessageWithTestCow
import dorkboxTest.network.rmi.cows.TestCow
import dorkboxTest.network.rmi.cows.TestCowImpl
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Test
class ReconnectTest: BaseTest() {
@Test
fun rmiReconnectPersistence() {
val server = run {
val configuration = serverConfig()
configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java)
configuration.serialization.register(MessageWithTestCow::class.java)
configuration.serialization.register(UnsupportedOperationException::class.java)
// for Client -> Server RMI
configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java)
val server = Server<Connection>(configuration)
addEndPoint(server)
server.onMessage<MessageWithTestCow> { m ->
server.logger.error("Received finish signal for test for: Client -> Server")
val `object` = m.testCow
val id = `object`.id()
Assert.assertEquals(23, id)
server.logger.error("Finished test for: Client -> Server")
rmi.delete(23)
// `object` is still a reference to the object!
// so we don't want to pass that back -- so pass back a new one
send(MessageWithTestCow(TestCowImpl(1)))
}
server
}
val client = run {
var firstRun = true
val configuration = clientConfig()
val client = Client<Connection>(configuration)
addEndPoint(client)
client.onConnect {
rmi.create<TestCow>(23) {
client.logger.error("Running test for: Client -> Server")
runBlocking {
RmiCommonTest.runTests(this@onConnect, this@create, 23)
}
client.logger.error("Done with test for: Client -> Server")
}
}
client.onMessage<MessageWithTestCow> { _ ->
// check if 23 still exists (it should not)
val obj = rmi.get<TestCow>(23)
try {
obj.id()
Assert.fail(".id() should throw an exception, the backing RMI object doesn't exist!")
} catch (e: Exception) {
// this is expected
}
if (firstRun) {
firstRun = false
client.close(false)
client.connectIpc()
} else {
stopEndPoints()
}
}
client
}
server.bindIpc()
client.connectIpc()
waitForThreads()
}
}

View File

@ -16,22 +16,15 @@
package dorkboxTest.network
import dorkbox.bytes.sha256
import dorkbox.executor.Executor.Companion.log
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.util.Sys
import dorkbox.network.connection.ConnectionParams
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.agrona.ExpandableDirectByteBuffer
import org.agrona.concurrent.SigInt
import org.junit.Assert
import org.junit.Test
import java.io.File
import java.security.SecureRandom
class ShutdownWhileInBadStateTest : BaseTest() {
@ -42,9 +35,12 @@ class ShutdownWhileInBadStateTest : BaseTest() {
val client = run {
val config = clientConfig()
val client: Client<Connection> = Client(config) {
Connection(it)
val client = object : Client<Connection>(config) {
override fun newConnection(connectionParameters: ConnectionParams<Connection>): Connection {
return Connection(connectionParameters)
}
}
addEndPoint(client)
client

View File

@ -20,6 +20,7 @@ import dorkbox.bytes.sha256
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.util.Sys
import org.agrona.ExpandableDirectByteBuffer
import org.junit.Assert
@ -56,9 +57,12 @@ class StreamingTest : BaseTest() {
val client = run {
val config = clientConfig()
val client: Client<Connection> = Client(config) {
Connection(it)
val client = object : Client<Connection>(config) {
override fun newConnection(connectionParameters: ConnectionParams<Connection>): Connection {
return Connection(connectionParameters)
}
}
addEndPoint(client)
client.onConnect {
@ -103,9 +107,7 @@ class StreamingTest : BaseTest() {
val client = run {
val config = clientConfig()
val client: Client<Connection> = Client(config) {
Connection(it)
}
val client = Client<Connection>(config)
addEndPoint(client)
client.onConnect {
@ -147,9 +149,7 @@ class StreamingTest : BaseTest() {
val client = run {
val config = clientConfig()
val client: Client<Connection> = Client(config) {
Connection(it)
}
val client = Client<Connection>(config)
addEndPoint(client)
client.onConnect {

View File

@ -159,4 +159,85 @@ class RmiSimpleActionsTest : BaseTest() {
waitForThreads()
}
@Test
fun rmiReconnectPersistence() {
val server = run {
val configuration = serverConfig()
configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java)
configuration.serialization.register(MessageWithTestCow::class.java)
configuration.serialization.register(UnsupportedOperationException::class.java)
// for Client -> Server RMI
configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java)
val server = Server<Connection>(configuration)
addEndPoint(server)
server.onMessage<MessageWithTestCow> { m ->
server.logger.error("Received finish signal for test for: Client -> Server")
val `object` = m.testCow
val id = `object`.id()
Assert.assertEquals(23, id)
server.logger.error("Finished test for: Client -> Server")
rmi.delete(23)
// `object` is still a reference to the object!
// so we don't want to pass that back -- so pass back a new one
send(MessageWithTestCow(TestCowImpl(1)))
}
server
}
val client = run {
var firstRun = true
val configuration = clientConfig()
// configuration.serialization.registerRmi(TestCow::class.java, TestCowImpl::class.java)
val client = Client<Connection>(configuration)
addEndPoint(client)
client.onConnect {
rmi.create<TestCow>(23) {
client.logger.error("Running test for: Client -> Server")
runBlocking {
RmiCommonTest.runTests(this@onConnect, this@create, 23)
}
client.logger.error("Done with test for: Client -> Server")
}
}
client.onMessage<MessageWithTestCow> { _ ->
// check if 23 still exists (it should not)
val obj = rmi.get<TestCow>(23)
try {
obj.id()
Assert.fail(".id() should throw an exception, the backing RMI object doesn't exist!")
} catch (e: Exception) {
// this is expected
}
if (firstRun) {
firstRun = false
client.close(false)
client.connectIpc()
} else {
stopEndPoints()
}
}
client
}
server.bindIpc()
client.connectIpc()
waitForThreads()
}
}