Cleaned up heap vs stack access, fixed ping support, fixed coroutine errors during serialization

This commit is contained in:
Robinson 2021-07-08 14:33:24 +02:00
parent 632603c8c7
commit c725806727
14 changed files with 285 additions and 207 deletions

View File

@ -26,6 +26,7 @@ import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingManager
import dorkbox.util.Sys
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.launch
@ -629,11 +630,11 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
*
* @return true if the ping was successfully sent to the client
*/
suspend fun ping(function: suspend Ping.() -> Unit): Boolean {
suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean {
val c = connection0
if (c != null) {
return pingManager.ping(c, actionDispatch, responseManager, function)
return pingManager.ping(c, pingTimeoutSeconds, actionDispatch, responseManager, logger, function)
} else {
logger.error("No connection!", ClientException("Cannot send a ping when there is no connection!"))
}
@ -646,9 +647,9 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
*
* @param function called when the ping returns (ie: update time/latency counters/metrics/etc)
*/
fun pingBlocking(function: suspend Ping.() -> Unit): Boolean {
fun pingBlocking(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean {
return runBlocking {
ping(function)
ping(pingTimeoutSeconds, function)
}
}

View File

@ -183,7 +183,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
publication,
sessionId,
message,
aeronDriver)
aeronDriver,
logger)
}
override fun poll(): Int { return subscription.poll(handler, 1) }
@ -269,7 +270,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
clientAddress,
message,
aeronDriver,
false)
false,
logger)
}
override fun poll(): Int { return subscription.poll(handler, 1) }
@ -355,7 +357,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
clientAddress,
message,
aeronDriver,
false)
false,
logger)
}
override fun poll(): Int { return subscription.poll(handler, 1) }
@ -441,7 +444,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
clientAddress,
message,
aeronDriver,
true)
true,
logger)
}
override fun poll(): Int { return subscription.poll(handler, 1) }

View File

@ -22,6 +22,7 @@ import dorkbox.network.aeron.UdpMediaDriverPairedConnection
import dorkbox.network.handshake.ConnectionCounts
import dorkbox.network.handshake.RandomIdAllocator
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingManager
import dorkbox.network.rmi.RmiSupportConnection
import io.aeron.FragmentAssembler
import io.aeron.Publication
@ -88,6 +89,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private val listenerManager = atomic<ListenerManager<Connection>?>(null)
val logger = endPoint.logger
private val isClosed = atomic(false)
internal var preCloseAction: suspend () -> Unit = {}
internal var postCloseAction: suspend () -> Unit = {}
@ -96,9 +98,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private var connectionLastCheckTime = 0L
private var connectionTimeoutTime = 0L
private val isClosed = atomic(false)
private val connectionCheckIntervalInMS = connectionParameters.endPoint.config.connectionCheckIntervalInMS
private val connectionExpirationTimoutInMS = connectionParameters.endPoint.config.connectionExpirationTimoutInMS
@ -243,8 +242,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*
* @return true if the message was successfully sent by aeron
*/
suspend fun ping(function: suspend Ping.() -> Unit): Boolean {
return endPoint.pingManager.ping(this, endPoint.actionDispatch, endPoint.responseManager, function)
suspend fun ping(pingTimeoutSeconds: Int = PingManager.DEFAULT_TIMEOUT_SECONDS, function: suspend Ping.() -> Unit): Boolean {
return endPoint.ping(this, pingTimeoutSeconds, function)
}
/**

View File

@ -113,7 +113,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
val storage: SettingsStore
internal val responseManager = ResponseManager(logger, actionDispatch)
internal val rmiGlobalSupport = RmiManagerGlobal(logger, listenerManager)
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger)
internal val rmiConnectionSupport: RmiManagerConnections<CONNECTION>
internal val pingManager = PingManager<CONNECTION>()
@ -310,6 +310,15 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
}
}
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*
* @return true if the message was successfully sent by aeron
*/
internal suspend fun ping(connection: Connection, pingTimeoutMs: Int, function: suspend Ping.() -> Unit): Boolean {
return pingManager.ping(connection, pingTimeoutMs, actionDispatch, responseManager, logger, function)
}
/**
* NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine!
* CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
@ -386,6 +395,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
internal fun readHandshakeMessage(buffer: DirectBuffer, offset: Int, length: Int, header: Header): Any? {
return try {
// NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change!
val message = handshakeKryo.read(buffer, offset, length)
logger.trace {
@ -415,31 +425,35 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
@Suppress("UNCHECKED_CAST")
connection as CONNECTION
val message: Any?
try {
message = serialization.readMessage(buffer, offset, length, connection)
logger.trace {
"[${header.sessionId()}] received: $message"
// NOTE: This ABSOLUTELY MUST be done on the same thread! This cannot be done on a new one, because the buffer could change!
val message = serialization.readMessage(buffer, offset, length, connection)
logger.trace { "[${header.sessionId()}] received: $message" }
// NOTE: This MUST be on a new co-routine
actionDispatch.launch {
try {
processMessage(message, connection)
} catch (e: Exception) {
logger.error("Error processing message", e)
listenerManager.notifyError(connection, e)
}
}
} catch (e: Exception) {
// The handshake sessionId IS NOT globally unique
logger.error("[${header.sessionId()}] Error de-serializing message", e)
listenerManager.notifyError(connection, e)
return // don't do anything!
}
}
/**
* Actually process the message.
*/
private suspend fun processMessage(message: Any?, connection: CONNECTION) {
when (message) {
is Ping -> {
actionDispatch.launch {
try {
pingManager.manage(connection, responseManager, message)
} catch (e: Exception) {
logger.error("Error processing PING message", e)
listenerManager.notifyError(connection, e)
}
}
pingManager.manage(connection, responseManager, message, logger)
}
// small problem... If we expect IN ORDER messages (ie: setting a value, then later reading the value), multiple threads don't work.
@ -448,26 +462,26 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
is RmiMessage -> {
// if we are an RMI message/registration, we have very specific, defined behavior.
// We do not use the "normal" listener callback pattern because this require special functionality
rmiGlobalSupport.manage(this@EndPoint, serialization, connection, message, rmiConnectionSupport, actionDispatch)
rmiGlobalSupport.manage(serialization, connection, message, rmiConnectionSupport, responseManager, logger)
}
is Any -> {
actionDispatch.launch {
try {
@Suppress("UNCHECKED_CAST")
var hasListeners = listenerManager.notifyOnMessage(connection, message)
try {
@Suppress("UNCHECKED_CAST")
var hasListeners = listenerManager.notifyOnMessage(connection, message)
// each connection registers, and is polled INDEPENDENTLY for messages.
hasListeners = hasListeners or connection.notifyOnMessage(message)
// each connection registers, and is polled INDEPENDENTLY for messages.
hasListeners = hasListeners or connection.notifyOnMessage(message)
if (!hasListeners) {
logger.error("No message callbacks found for ${message::class.java.simpleName}")
}
} catch (e: Exception) {
logger.error("Error processing message", e)
listenerManager.notifyError(connection, e)
if (!hasListeners) {
logger.error("No message callbacks found for ${message::class.java.simpleName}")
}
} catch (e: Exception) {
logger.error("Error processing message", e)
listenerManager.notifyError(connection, e)
}
}
else -> {
// do nothing, there were problems with the message
if (message != null) {
@ -479,6 +493,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
}
}
/**
* NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine!
*

View File

@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit
* 'notifyConnect' must be THE ONLY THING in this class to use the action dispatch!
*/
@Suppress("DuplicatedCode")
internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLogger,
internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
private val config: ServerConfiguration,
private val listenerManager: ListenerManager<CONNECTION>) {
@ -68,12 +68,15 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
* @return true if we should continue parsing the incoming message, false if we should abort
*/
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD. ONLY RESPONSES ARE ON ACTION DISPATCH!
private fun validateMessageTypeAndDoPending(server: Server<CONNECTION>,
actionDispatch: CoroutineScope,
handshakePublication: Publication,
message: HandshakeMessage,
sessionId: Int,
connectionString: String): Boolean {
private fun validateMessageTypeAndDoPending(
server: Server<CONNECTION>,
actionDispatch: CoroutineScope,
handshakePublication: Publication,
message: HandshakeMessage,
sessionId: Int,
connectionString: String,
logger: KLogger
): Boolean {
// check to see if this sessionId is ALREADY in use by another connection!
// this can happen if there are multiple connections from the SAME ip address (ie: localhost)
@ -131,11 +134,14 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
* @return true if we should continue parsing the incoming message, false if we should abort
*/
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
private fun validateUdpConnectionInfo(server: Server<CONNECTION>,
handshakePublication: Publication,
config: ServerConfiguration,
clientAddressString: String,
clientAddress: InetAddress): Boolean {
private fun validateUdpConnectionInfo(
server: Server<CONNECTION>,
handshakePublication: Publication,
config: ServerConfiguration,
clientAddressString: String,
clientAddress: InetAddress,
logger: KLogger
): Boolean {
try {
// VALIDATE:: Check to see if there are already too many clients connected.
@ -182,16 +188,19 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
fun processIpcHandshakeMessageServer(server: Server<CONNECTION>,
rmiConnectionSupport: RmiManagerConnections<CONNECTION>,
handshakePublication: Publication,
sessionId: Int,
message: HandshakeMessage,
aeronDriver: AeronDriver) {
fun processIpcHandshakeMessageServer(
server: Server<CONNECTION>,
rmiConnectionSupport: RmiManagerConnections<CONNECTION>,
handshakePublication: Publication,
sessionId: Int,
message: HandshakeMessage,
aeronDriver: AeronDriver,
logger: KLogger
) {
val connectionString = "IPC"
if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, connectionString)) {
if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, connectionString, logger)) {
return
}
@ -325,9 +334,18 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
clientAddress: InetAddress,
message: HandshakeMessage,
aeronDriver: AeronDriver,
isIpv6Wildcard: Boolean) {
isIpv6Wildcard: Boolean,
logger: KLogger) {
if (!validateMessageTypeAndDoPending(server, server.actionDispatch, handshakePublication, message, sessionId, clientAddressString)) {
if (!validateMessageTypeAndDoPending(
server,
server.actionDispatch,
handshakePublication,
message,
sessionId,
clientAddressString,
logger
)) {
return
}
@ -343,7 +361,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
}
if (!clientAddress.isLoopbackAddress &&
!validateUdpConnectionInfo(server, handshakePublication, config, clientAddressString, clientAddress)) {
!validateUdpConnectionInfo(server, handshakePublication, config, clientAddressString, clientAddress, logger)) {
// we do not want to limit loopback addresses!
return
}

View File

@ -21,29 +21,33 @@ package dorkbox.network.ping
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.ResponseManager
import dorkbox.network.rmi.RmiUtils
import kotlinx.coroutines.CoroutineScope
import mu.KLogger
/**
* How to handle ping messages
*/
internal class PingManager<CONNECTION : Connection> {
@Suppress("UNCHECKED_CAST")
suspend fun manage(connection: CONNECTION, responseManager: ResponseManager, message: Ping) {
if (message.pongTime == 0L) {
message.pongTime = System.currentTimeMillis()
connection.send(message)
} else {
message.finishedTime = System.currentTimeMillis()
companion object {
val DEFAULT_TIMEOUT_SECONDS = 30
}
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
@Suppress("UNCHECKED_CAST")
suspend fun manage(connection: CONNECTION, responseManager: ResponseManager, ping: Ping, logger: KLogger) {
if (ping.pongTime == 0L) {
ping.pongTime = System.currentTimeMillis()
connection.send(ping)
} else {
ping.finishedTime = System.currentTimeMillis()
val rmiId = ping.packedId
// process the ping message so that our ping callback does something
// this will be null if the ping took longer than 30 seconds and was cancelled
val result = responseManager.getWaiterCallback(rmiId) as (suspend Ping.() -> Unit)?
val result = responseManager.getWaiterCallback(rmiId, logger) as (suspend Ping.() -> Unit)?
if (result != null) {
result(message)
result(ping)
}
}
}
@ -53,17 +57,29 @@ internal class PingManager<CONNECTION : Connection> {
*
* @return true if the message was successfully sent by aeron
*/
internal suspend fun ping(connection: Connection, actionDispatch: CoroutineScope, responseManager: ResponseManager, function: suspend Ping.() -> Unit): Boolean {
val id = responseManager.prepWithCallback(function)
internal suspend fun ping(
connection: Connection,
pingTimeoutSeconds: Int,
actionDispatch: CoroutineScope,
responseManager: ResponseManager,
logger: KLogger,
function: suspend Ping.() -> Unit
): Boolean {
val id = responseManager.prepWithCallback(function, logger)
val ping = Ping()
ping.packedId = RmiUtils.unpackUnsignedRight(id)
ping.packedId = id
ping.pingTime = System.currentTimeMillis()
// NOTE: the timout MUST NOT be more than the max SHORT value!
if (pingTimeoutSeconds > 60) {
// just over 1 minute timeout.
throw IllegalArgumentException("Ping timeout parameter `pingTimeoutSeconds` cannot exceed 60 seconds")
}
// ALWAYS cancel the ping after 30 seconds
responseManager.cancelRequest(actionDispatch, 30_000L, id) {
responseManager.cancelRequest(actionDispatch, pingTimeoutSeconds.toLong(), id, logger) {
// kill the callback, since we are now "cancelled". If there is a race here (and the response comes at the exact same time)
// we don't care since either it will be null or it won't (if it's not null, it will run the callback)
result = null

View File

@ -0,0 +1,37 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.ping
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
class PingSerializer: Serializer<Ping>() {
override fun write(kryo: Kryo, output: Output, ping: Ping) {
output.writeInt(ping.packedId)
output.writeLong(ping.pingTime)
output.writeLong(ping.pongTime)
}
override fun read(kryo: Kryo, input: Input, type: Class<out Ping>): Ping {
val ping = Ping()
ping.packedId = input.readInt()
ping.pingTime = input.readLong()
ping.pongTime = input.readLong()
return ping
}
}

View File

@ -31,7 +31,7 @@ import kotlin.concurrent.write
*
* response ID's and the memory they hold will leak if the response never arrives!
*/
internal class ResponseManager(private val logger: KLogger, actionDispatch: CoroutineScope) {
internal class ResponseManager(logger: KLogger, actionDispatch: CoroutineScope) {
companion object {
val TIMEOUT_EXCEPTION = Exception()
}
@ -53,22 +53,22 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
init {
// create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint!
val ids = mutableListOf<Int>()
// MIN (32768) -> -1 (65535)
// ZERO is special, and is never added!
// ONE is special, and is used for ASYNC (the response will never be sent back)
// 2 (2) -> MAX (32767)
for (id in Short.MIN_VALUE..-1) {
ids.add(id)
}
// MIN (32768) -> -1 (65535)
// 2 (2) -> MAX (32767)
// ZERO is special, and is never added!
// ONE is special, and is used for ASYNC (the response will never be sent back)
for (id in 2..Short.MAX_VALUE) {
ids.add(id)
}
ids.shuffle()
// populate the array of randomly assigned ID's + waiters. This can happen in a new thread
// populate the array of randomly assigned ID's + waiters. It is OK for this to happen in a new thread
actionDispatch.launch {
try {
for (it in ids) {
@ -81,14 +81,13 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
}
}
/**
* Called when we receive the answer for our initial request. If no response data, then the pending rmi data entry is deleted
*
* resume any pending remote object method invocations (if they are not async, or not manually waiting)
* NOTE: async RMI will never call this (because async doesn't return a response)
*/
suspend fun notifyWaiter(rmiId: Int, result: Any?) {
suspend fun notifyWaiter(rmiId: Int, result: Any?, logger: KLogger) {
logger.trace { "RMI return message: $rmiId" }
val previous = pendingLock.write {
@ -111,7 +110,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
*
* This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different)
*/
suspend fun <T> getWaiterCallback(rmiId: Int): T? {
suspend fun <T> getWaiterCallback(rmiId: Int, logger: KLogger): T? {
logger.trace { "RMI return message: $rmiId" }
val previous = pendingLock.write {
@ -139,7 +138,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
*
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
*/
suspend fun prep(): ResponseWaiter {
suspend fun prep(logger: KLogger): ResponseWaiter {
val responseRmi = waiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
@ -147,9 +146,11 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep()
val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id)
pendingLock.write {
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
pending[rmiId] = responseRmi
}
return responseRmi
@ -160,7 +161,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
*
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
*/
suspend fun prepWithCallback(function: Any): Int {
suspend fun prepWithCallback(function: Any, logger: KLogger): Int {
val responseRmi = waiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
@ -171,19 +172,22 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
// assign the callback that will be notified when the return message is received
responseRmi.result = function
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
val rmiId = RmiUtils.unpackUnsignedRight(responseRmi.id)
pendingLock.write {
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
pending[rmiId] = responseRmi
}
return responseRmi.id
return rmiId
}
/**
* Cancels the RMI request in the given timeout, the callback is executed inside the read lock
*/
fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, onCancelled: ResponseWaiter.() -> Unit) {
fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, logger: KLogger, onCancelled: ResponseWaiter.() -> Unit) {
actionDispatch.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
@ -207,7 +211,7 @@ internal class ResponseManager(private val logger: KLogger, actionDispatch: Coro
*
* @return the result (can be null) or timeout exception
*/
suspend fun waitForReply(actionDispatch: CoroutineScope, responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? {
suspend fun waitForReply(actionDispatch: CoroutineScope, responseWaiter: ResponseWaiter, timeoutMillis: Long, logger: KLogger): Any? {
val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
logger.trace {

View File

@ -19,6 +19,7 @@ import dorkbox.network.connection.Connection
import dorkbox.network.rmi.messages.MethodRequest
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import mu.KLogger
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.util.*
@ -76,7 +77,7 @@ internal class RmiClient(val isGlobal: Boolean,
private var enableEquals = false
// if we are ASYNC, then this method immediately returns
private suspend fun sendRequest(actionDispatch: CoroutineScope, invokeMethod: MethodRequest): Any? {
private suspend fun sendRequest(actionDispatch: CoroutineScope, invokeMethod: MethodRequest, logger: KLogger): Any? {
// there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods --
// the "server" side can have out-of-order method invocation. There are 2 ways to solve this
// 1) make the "server" side single threaded
@ -102,12 +103,12 @@ internal class RmiClient(val isGlobal: Boolean,
null
} else {
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep
val rmiWaiter = responseManager.prep()
val rmiWaiter = responseManager.prep(logger)
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id)
connection.send(invokeMethod)
responseManager.waitForReply(actionDispatch, rmiWaiter, timeoutMillis)
responseManager.waitForReply(actionDispatch, rmiWaiter, timeoutMillis, logger)
}
}
@ -217,7 +218,7 @@ internal class RmiClient(val isGlobal: Boolean,
val continuation = suspendCoroutineArg as Continuation<Any?>
val suspendFunction: suspend () -> Any? = {
sendRequest(connection.endPoint.actionDispatch, invokeMethod)
sendRequest(connection.endPoint.actionDispatch, invokeMethod, connection.logger)
}
// function suspension works differently !!
@ -245,7 +246,7 @@ internal class RmiClient(val isGlobal: Boolean,
})
} else {
val any = runBlocking {
sendRequest(connection.endPoint.actionDispatch, invokeMethod)
sendRequest(connection.endPoint.actionDispatch, invokeMethod, connection.logger)
}
when (any) {
ResponseManager.TIMEOUT_EXCEPTION -> {

View File

@ -24,8 +24,6 @@ import dorkbox.network.rmi.messages.ConnectionObjectDeleteResponse
import dorkbox.network.serialization.Serialization
import dorkbox.util.classes.ClassHelper
import dorkbox.util.collections.LockFreeIntMap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KLogger
class RmiManagerConnections<CONNECTION: Connection> internal constructor(
@ -140,12 +138,7 @@ class RmiManagerConnections<CONNECTION: Connection> internal constructor(
/**
* called on "server"
*/
fun onConnectionObjectCreateRequest(
serialization: Serialization<CONNECTION>,
connection: CONNECTION,
message: ConnectionObjectCreateRequest,
actionDispatch: CoroutineScope
) {
suspend fun onConnectionObjectCreateRequest(serialization: Serialization<CONNECTION>, connection: CONNECTION, message: ConnectionObjectCreateRequest) {
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val kryoId = RmiUtils.unpackRight(message.packedIds)
val objectParameters = message.objectParameters
@ -171,20 +164,14 @@ class RmiManagerConnections<CONNECTION: Connection> internal constructor(
ConnectionObjectCreateResponse(RmiUtils.packShorts(callbackId, rmiId))
}
actionDispatch.launch {
// we send the message ALWAYS, because the client needs to know it worked or not
connection.send(response)
}
// we send the message ALWAYS, because the client needs to know it worked or not
connection.send(response)
}
/**
* called on "client"
*/
fun onConnectionObjectCreateResponse(
connection: CONNECTION,
message: ConnectionObjectCreateResponse,
actionDispatch: CoroutineScope
) {
suspend fun onConnectionObjectCreateResponse(connection: CONNECTION, message: ConnectionObjectCreateResponse) {
val callbackId = RmiUtils.unpackLeft(message.packedIds)
val rmiId = RmiUtils.unpackRight(message.packedIds)
@ -204,25 +191,19 @@ class RmiManagerConnections<CONNECTION: Connection> internal constructor(
val proxyObject = getProxyObject(false, connection, rmiId, interfaceClass)
// this should be executed on a NEW coroutine!
actionDispatch.launch {
try {
callback(proxyObject)
} catch (e: Exception) {
ListenerManager.cleanStackTrace(e)
logger.error("RMI error connection ${connection.id}", e)
listenerManager.notifyError(connection, e)
}
try {
callback(proxyObject)
} catch (e: Exception) {
ListenerManager.cleanStackTrace(e)
logger.error("RMI error connection ${connection.id}", e)
listenerManager.notifyError(connection, e)
}
}
/**
* called on "client" or "server"
*/
fun onConnectionObjectDeleteRequest(
connection: CONNECTION,
message: ConnectionObjectDeleteRequest,
actionDispatch: CoroutineScope
) {
suspend fun onConnectionObjectDeleteRequest(connection: CONNECTION, message: ConnectionObjectDeleteRequest) {
val rmiId = message.rmiId
// we only delete the impl object if the RMI id is valid!
@ -238,10 +219,8 @@ class RmiManagerConnections<CONNECTION: Connection> internal constructor(
removeProxyObject(rmiId)
removeImplObject<Any?>(rmiId)
actionDispatch.launch {
// tell the "other side" to delete the proxy/impl object
connection.send(ConnectionObjectDeleteResponse(rmiId))
}
// tell the "other side" to delete the proxy/impl object
connection.send(ConnectionObjectDeleteResponse(rmiId))
}

View File

@ -16,18 +16,13 @@
package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.ListenerManager
import dorkbox.network.rmi.messages.*
import dorkbox.network.serialization.Serialization
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KLogger
import java.lang.reflect.Proxy
import java.util.*
internal class RmiManagerGlobal<CONNECTION: Connection>(private val logger: KLogger,
private val listenerManager: ListenerManager<CONNECTION>) : RmiObjectCache(logger) {
internal class RmiManagerGlobal<CONNECTION: Connection>(logger: KLogger) : RmiObjectCache(logger) {
companion object {
/**
@ -93,32 +88,32 @@ internal class RmiManagerGlobal<CONNECTION: Connection>(private val logger: KLog
* Manages ALL OF THE RMI SCOPES
*/
@Suppress("DuplicatedCode")
fun manage(
endPoint: EndPoint<CONNECTION>,
suspend fun manage(
serialization: Serialization<CONNECTION>,
connection: CONNECTION,
message: Any,
rmiConnectionSupport: RmiManagerConnections<CONNECTION>,
actionDispatch: CoroutineScope
responseManager: ResponseManager,
logger: KLogger
) {
when (message) {
is ConnectionObjectCreateRequest -> {
/**
* called on "server"
*/
rmiConnectionSupport.onConnectionObjectCreateRequest(serialization, connection, message, actionDispatch)
rmiConnectionSupport.onConnectionObjectCreateRequest(serialization, connection, message)
}
is ConnectionObjectCreateResponse -> {
/**
* called on "client"
*/
rmiConnectionSupport.onConnectionObjectCreateResponse(connection, message, actionDispatch)
rmiConnectionSupport.onConnectionObjectCreateResponse(connection, message)
}
is ConnectionObjectDeleteRequest -> {
/**
* called on "client" or "server"
*/
rmiConnectionSupport.onConnectionObjectDeleteRequest(connection, message, actionDispatch)
rmiConnectionSupport.onConnectionObjectDeleteRequest(connection, message)
}
is ConnectionObjectDeleteResponse -> {
/**
@ -159,9 +154,7 @@ internal class RmiManagerGlobal<CONNECTION: Connection>(private val logger: KLog
logger
)
actionDispatch.launch {
connection.send(rmiMessage)
}
connection.send(rmiMessage)
}
return
}
@ -191,49 +184,47 @@ internal class RmiManagerGlobal<CONNECTION: Connection>(private val logger: KLog
if (isCoroutine) {
// https://stackoverflow.com/questions/47654537/how-to-run-suspend-method-via-reflection
// https://discuss.kotlinlang.org/t/calling-coroutines-suspend-functions-via-reflection/4672
actionDispatch.launch {
var suspendResult = kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn<Any?> { cont ->
// if we are a coroutine, we have to replace the LAST arg with the coroutine object
// we KNOW this is OK, because a continuation arg will always be there!
args!![args.size - 1] = cont
var suspendResult = kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn<Any?> { cont ->
// if we are a coroutine, we have to replace the LAST arg with the coroutine object
// we KNOW this is OK, because a continuation arg will always be there!
args!![args.size - 1] = cont
var insideResult: Any?
try {
// args!! is safe to do here (even though it doesn't make sense)
insideResult = cachedMethod.invoke(connection, implObject, args)
} catch (ex: Exception) {
insideResult = ex.cause
// added to prevent a stack overflow when references is false, (because 'cause' == "this").
// See:
// https://groups.google.com/forum/?fromgroups=#!topic/kryo-users/6PDs71M1e9Y
if (insideResult == null) {
insideResult = ex
}
else {
insideResult.initCause(null)
}
var insideResult: Any?
try {
// args!! is safe to do here (even though it doesn't make sense)
insideResult = cachedMethod.invoke(connection, implObject, args)
} catch (ex: Exception) {
insideResult = ex.cause
// added to prevent a stack overflow when references is false, (because 'cause' == "this").
// See:
// https://groups.google.com/forum/?fromgroups=#!topic/kryo-users/6PDs71M1e9Y
if (insideResult == null) {
insideResult = ex
}
insideResult
else {
insideResult.initCause(null)
}
}
insideResult
}
if (suspendResult === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) {
// we were suspending, and the stack will resume when possible, then it will call the response below
}
else {
if (suspendResult === Unit) {
// kotlin suspend returns, that DO NOT have a return value, REALLY return kotlin.Unit. This means there is no
// return value!
suspendResult = null
} else if (suspendResult is Exception) {
RmiUtils.cleanStackTraceForImpl(suspendResult, true)
logger.error("Connection ${connection.id}", suspendResult)
}
if (suspendResult === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) {
// we were suspending, and the stack will resume when possible, then it will call the response below
}
else {
if (suspendResult === Unit) {
// kotlin suspend returns, that DO NOT have a return value, REALLY return kotlin.Unit. This means there is no
// return value!
suspendResult = null
} else if (suspendResult is Exception) {
RmiUtils.cleanStackTraceForImpl(suspendResult, true)
logger.error("Connection ${connection.id}", suspendResult)
}
if (sendResponse) {
val rmiMessage = returnRmiMessage(message, suspendResult, logger)
connection.send(rmiMessage)
}
if (sendResponse) {
val rmiMessage = returnRmiMessage(message, suspendResult, logger)
connection.send(rmiMessage)
}
}
}
@ -261,20 +252,16 @@ internal class RmiManagerGlobal<CONNECTION: Connection>(private val logger: KLog
if (sendResponse) {
val rmiMessage = returnRmiMessage(message, result, logger)
actionDispatch.launch {
connection.send(rmiMessage)
}
connection.send(rmiMessage)
}
}
}
is MethodResponse -> {
// notify the pending proxy requests that we have a response!
actionDispatch.launch {
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
endPoint.responseManager.notifyWaiter(rmiId, result)
}
responseManager.notifyWaiter(rmiId, result, logger)
}
}
}

View File

@ -452,6 +452,11 @@ object RmiUtils {
return packedInt.toUShort().toInt()
}
@Suppress("EXPERIMENTAL_API_USAGE")
fun unpackUnsignedRight(packedLong: Long): Int {
return packedLong.toUShort().toInt()
}
fun makeFancyMethodName(method: CachedMethod): String {
return makeFancyMethodName(method.method)
}

View File

@ -24,6 +24,7 @@ import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.handshake.HandshakeMessage
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingSerializer
import dorkbox.network.rmi.CachedMethod
import dorkbox.network.rmi.RmiUtils
import dorkbox.network.rmi.messages.*
@ -135,6 +136,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
private val rmiClientSerializer = RmiClientSerializer<CONNECTION>()
private val rmiServerSerializer = RmiServerSerializer<CONNECTION>()
private val pingSerializer = PingSerializer()
/**
* There is additional overhead to using RMI.
*
@ -335,7 +338,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
kryo.register(MethodRequest::class.java, methodRequestSerializer)
kryo.register(MethodResponse::class.java, methodResponseSerializer)
kryo.register(Ping::class.java)
kryo.register(Ping::class.java, pingSerializer)
@Suppress("UNCHECKED_CAST")
kryo.register(InvocationHandler::class.java as Class<Any>, rmiClientSerializer)
@ -380,7 +383,7 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
kryo.register(MethodRequest::class.java, methodRequestSerializer)
kryo.register(MethodResponse::class.java, methodResponseSerializer)
kryo.register(Ping::class.java)
kryo.register(Ping::class.java, pingSerializer)
@Suppress("UNCHECKED_CAST")
kryo.register(InvocationHandler::class.java as Class<Any>, rmiClientSerializer)

View File

@ -1,5 +1,6 @@
package dorkboxTest.network
import ch.qos.logback.classic.Level
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.connection.Connection
@ -12,6 +13,8 @@ import org.junit.Test
class PingTest : BaseTest() {
@Test
fun RmiPing() {
setLogLevel(Level.TRACE)
val clientSuccess = atomic(false)
run {
@ -29,14 +32,20 @@ class PingTest : BaseTest() {
addEndPoint(client)
client.onConnect {
ping {
clientSuccess.value = true
repeat(100) {
ping {
println(it)
logger.error("out-bound: $outbound")
logger.error("in-bound: $inbound")
logger.error("round-trip: $roundtrip")
if (it == 99) {
clientSuccess.value = true
stopEndPoints()
logger.error("out-bound: $outbound")
logger.error("in-bound: $inbound")
logger.error("round-trip: $roundtrip")
stopEndPoints()
}
}
}
}