WIP RMI and message performance/ping testing

This commit is contained in:
Robinson 2021-07-07 16:19:00 +02:00
parent 33f3ca8ebc
commit 632603c8c7
18 changed files with 384 additions and 516 deletions

View File

@ -633,7 +633,7 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
val c = connection0
if (c != null) {
return pingManager.ping(c, function)
return pingManager.ping(c, actionDispatch, responseManager, function)
} else {
logger.error("No connection!", ClientException("Cannot send a ping when there is no connection!"))
}

View File

@ -22,7 +22,6 @@ import dorkbox.network.aeron.UdpMediaDriverPairedConnection
import dorkbox.network.handshake.ConnectionCounts
import dorkbox.network.handshake.RandomIdAllocator
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingMessage
import dorkbox.network.rmi.RmiSupportConnection
import io.aeron.FragmentAssembler
import io.aeron.Publication
@ -103,14 +102,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private val connectionCheckIntervalInMS = connectionParameters.endPoint.config.connectionCheckIntervalInMS
private val connectionExpirationTimoutInMS = connectionParameters.endPoint.config.connectionExpirationTimoutInMS
/**
// * Returns the last calculated TCP return trip time, or -1 if or the [PingMessage] response has not yet been received.
// */
// val lastRoundTripTime: Int
// get() {
// val pingFuture2 = pingFuture
// return pingFuture2?.response ?: -1
// }
// while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error.
private val remoteKeyChanged = connectionParameters.publicKeyValidation == PublicKeyValidationState.TAMPERED
@ -250,12 +241,10 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*
* Only 1 in-flight ping can be performed at a time. Calling ping() again, before the previous ping returns will do nothing.
*
* @return true if the message was successfully sent by aeron
*/
suspend fun ping(function: suspend Ping.() -> Unit): Boolean {
return endPoint.pingManager.ping(this, function)
return endPoint.pingManager.ping(this, endPoint.actionDispatch, endPoint.responseManager, function)
}
/**

View File

@ -28,7 +28,7 @@ import dorkbox.network.handshake.HandshakeMessage
import dorkbox.network.ipFilter.IpFilterRule
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingManager
import dorkbox.network.ping.PingMessage
import dorkbox.network.rmi.ResponseManager
import dorkbox.network.rmi.RmiManagerConnections
import dorkbox.network.rmi.RmiManagerGlobal
import dorkbox.network.rmi.messages.MethodResponse
@ -80,8 +80,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
internal val listenerManager = ListenerManager<CONNECTION>(logger)
internal val connections = ConnectionManager<CONNECTION>()
internal val pingManager = PingManager<CONNECTION>(logger, actionDispatch)
internal val aeronDriver: AeronDriver
/**
@ -114,9 +112,12 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
*/
val storage: SettingsStore
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, actionDispatch)
internal val responseManager = ResponseManager(logger, actionDispatch)
internal val rmiGlobalSupport = RmiManagerGlobal(logger, listenerManager)
internal val rmiConnectionSupport: RmiManagerConnections<CONNECTION>
internal val pingManager = PingManager<CONNECTION>()
init {
require(!config.previouslyUsed) { "${type.simpleName} configuration cannot be reused!" }
config.validate()
@ -148,13 +149,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
if (type.javaClass == Server::class.java) {
// server cannot "get" global RMI objects, only the client can
@Suppress("UNCHECKED_CAST")
rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, config.serialization as Serialization<CONNECTION>)
rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization<CONNECTION>)
{ _, _, _ ->
throw IllegalAccessException("Global RMI access is only possible from a Client connection!")
}
} else {
@Suppress("UNCHECKED_CAST")
rmiConnectionSupport = RmiManagerConnections(logger, rmiGlobalSupport, config.serialization as Serialization<CONNECTION>)
rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization<CONNECTION>)
{ connection, objectId, interfaceClass ->
return@RmiManagerConnections rmiGlobalSupport.getGlobalRemoteObject(connection, objectId, interfaceClass)
}
@ -309,17 +310,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
}
}
/**
* Adds a function that will be called when a ping message is received
*
* @param function called when the ping returns (ie: update time/latency counters/metrics/etc)
*/
fun onPing(function: suspend CONNECTION.(Ping) -> Unit) {
actionDispatch.launch {
listenerManager.onPing(function)
}
}
/**
* NOTE: this **MUST** stay on the same co-routine that calls "send". This cannot be re-dispatched onto a different coroutine!
* CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
@ -441,11 +431,10 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
when (message) {
is PingMessage -> {
// the ping listener
is Ping -> {
actionDispatch.launch {
try {
pingManager.manage(this@EndPoint, connection, message, logger)
pingManager.manage(connection, responseManager, message)
} catch (e: Exception) {
logger.error("Error processing PING message", e)
listenerManager.notifyError(connection, e)
@ -457,16 +446,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// this is worked around by having RMI always return (unless async), even with a null value, so the CALLING side of RMI will always
// go in "lock step"
is RmiMessage -> {
actionDispatch.launch {
// if we are an RMI message/registration, we have very specific, defined behavior.
// We do not use the "normal" listener callback pattern because this require special functionality
try {
rmiGlobalSupport.manage(this@EndPoint, connection, message, logger)
} catch (e: Exception) {
logger.error("Error processing RMI message", e)
listenerManager.notifyError(connection, e)
}
}
// if we are an RMI message/registration, we have very specific, defined behavior.
// We do not use the "normal" listener callback pattern because this require special functionality
rmiGlobalSupport.manage(this@EndPoint, serialization, connection, message, rmiConnectionSupport, actionDispatch)
}
is Any -> {
actionDispatch.launch {
@ -680,7 +662,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// Connections are closed first, because we want to make sure that no RMI messages can be received
// when we close the RMI support objects (in which case, weird - but harmless - errors show up)
// this will wait for RMI timeouts if there are RMI in-progress. (this happens if we close via and RMI method)
rmiGlobalSupport.close()
responseManager.close()
}
// the storage is closed via this as well.

View File

@ -16,7 +16,6 @@
package dorkbox.network.connection
import dorkbox.network.ipFilter.IpFilterRule
import dorkbox.network.ping.Ping
import dorkbox.os.OS
import dorkbox.util.classes.ClassHelper
import dorkbox.util.classes.ClassHierarchy
@ -155,9 +154,6 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
private val onMessageMap = atomic(IdentityMap<Class<*>, Array<suspend CONNECTION.(Any) -> Unit>>(32, LOAD_FACTOR))
private val onMessageMutex = Mutex()
private val onPingList = atomic(Array<suspend (CONNECTION.(Ping) -> Unit)>(0) { { } })
private val onPingMutex = Mutex()
// used to keep a cache of class hierarchy for distributing messages
private val classHierarchyCache = ClassHierarchy(LOAD_FACTOR)
@ -307,18 +303,6 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
}
}
/**
* Adds a function that will be called when a client/server "connects" with each other
*
* For a server, this function will be called for ALL clients.
*/
suspend fun onPing(function: suspend CONNECTION.(Ping) -> Unit) {
onPingMutex.withLock {
// we have to follow the single-writer principle!
onPingList.lazySet(add(function, onPingList.value))
}
}
/**
* Invoked just after a connection is created, but before it is connected.
*
@ -448,19 +432,4 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
return hasListeners
}
/**
* Invoked when a connection is 'pinged' by the remote endpoint
*/
suspend fun notifyPing(ping: Ping, connection: CONNECTION) {
onPingList.value.forEach {
try {
it(connection, ping)
} catch (t: Throwable) {
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
cleanStackTrace(t)
logger.error("Connection ${connection.id} error", t)
}
}
}
}

View File

@ -15,26 +15,67 @@
*/
package dorkbox.network.ping
import dorkbox.network.rmi.RmiUtils
class Ping {
// /**
// * Wait for the ping to return, and returns the ping response time in MS or -1 if it failed.
// */
val time: Int = 0
//
// /**
// * Adds a ping listener to this future. The listener is notified when this future is done. If this future is already completed,
// * then the listener is notified immediately.
// */
// fun <C : Connection> add(listener: PingListener<C>)
//
// /**
// * Removes a ping listener from this future. The listener is no longer notified when this future is done. If the listener
// * was not previously associated with this future, this method does nothing and returns silently.
// */
// fun <C : Connection> remove(listener: PingListener<C>)
//
// /**
// * Cancel this Ping.
// */
// fun cancel()
var packedId = 0
// ping/pong times are the LOWER 8 bytes of a long, which gives us 65 seconds. This is the same as the max value timeout (a short) so this is acceptible
var pingTime = 0L
var pongTime = 0L
@Transient
var finishedTime = 0L
/**
* The time it took for the remote connection to return the ping to us. This is only accurate if the clocks are synchronized
*/
val inbound: Long
get() {
return finishedTime - pongTime
}
/**
* The time it took for us to ping the remote connection. This is only accurate if the clocks are synchronized.
*/
val outbound: Long
get() {
return pongTime - pingTime
}
/**
* The round-trip time it took to ping the remote connection
*/
val roundtrip: Long
get() {
return finishedTime - pingTime
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as Ping
if (packedId != other.packedId) return false
if (pingTime != other.pingTime) return false
if (pongTime != other.pongTime) return false
if (finishedTime != other.finishedTime) return false
return true
}
override fun hashCode(): Int {
var result = packedId
result = 31 * result + pingTime.hashCode()
result = 31 * result + pongTime.hashCode()
result = 31 * result + finishedTime.hashCode()
return result
}
override fun toString(): String {
return "Ping ${RmiUtils.unpackUnsignedRight(packedId)} (pingTime=$pingTime, pongTime=$pongTime, finishedTime=$finishedTime)"
}
}

View File

@ -1,22 +0,0 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.ping
import java.io.IOException
object PingCanceledException : IOException() {
private const val serialVersionUID = 9045461384091038605L
}

View File

@ -1,118 +0,0 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("UNUSED_PARAMETER")
package dorkbox.network.ping
import dorkbox.network.connection.Connection
import java.util.concurrent.atomic.AtomicInteger
class PingFuture internal constructor() {
/**
* @return the ID of this ping future
*/
// private final Promise<PingTuple<? extends Connection>> promise;
val id: Int
private val sentTime: Long
// public
// PingFuture(Promise<PingTuple<? extends Connection>> promise) {
// this.promise = promise;
// this.id = pingCounter.getAndIncrement();
// this.sentTime = System.currentTimeMillis();
//
// if (this.id == Integer.MAX_VALUE) {
// pingCounter.set(0);
// }
// }// try {
// PingTuple<? extends Connection> entry = this.promise.syncUninterruptibly()
// .get();
// if (entry != null) {
// return entry.responseTime;
// }
// } catch (InterruptedException e) {
// } catch (ExecutionException e) {
// }
/**
* Wait for the ping to return, and returns the ping response time in MS or -1 if it failed.
*/
val response: Int
get() =// try {
// PingTuple<? extends Connection> entry = this.promise.syncUninterruptibly()
// .get();
// if (entry != null) {
// return entry.responseTime;
// }
// } catch (InterruptedException e) {
// } catch (ExecutionException e) {
// }
-1
/**
* Adds the specified listener to this future. The specified listener is notified when this future is done. If this future is already
* completed, the specified listener is notified immediately.
*/
fun <C : Connection> add(listener: PingListener<C>) {
// this.promise.addListener((GenericFutureListener) listener);
}
/**
* Removes the specified listener from this future. The specified listener is no longer notified when this future is done. If the
* specified listener is not associated with this future, this method does nothing and returns silently.
*/
fun <C : Connection> remove(listener: PingListener<C>) {
// this.promise.removeListener((GenericFutureListener) listener);
}
/**
* Cancel this Ping.
*/
fun cancel() {
// this.promise.tryFailure(new PingCanceledException());
}
/**
* This is when the endpoint that ORIGINALLY sent the ping, finally receives a response.
*/
fun <C : Connection?> setSuccess(connection: C, ping: PingMessage?) {
// if (ping.id == this.id) {
// long longTime = System.currentTimeMillis() - this.sentTime;
// if (longTime < Integer.MAX_VALUE) {
// this.promise.setSuccess(new PingTuple<C>(connection, (int) longTime));
// }
// else {
// this.promise.setSuccess(new PingTuple<C>(connection, Integer.MAX_VALUE));
// }
// }
}
// return this.promise.isSuccess();
val isSuccess: Boolean
get() =// return this.promise.isSuccess();
false
companion object {
private val pingCounter = AtomicInteger(0)
}
/**
* Protected constructor for when we are completely overriding this class. (Used by the "local" connection for instant pings)
*/
init {
// this(null);
id = -1
sentTime = -2
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.ping
import dorkbox.network.connection.Connection
// note that we specifically DO NOT implement equals/hashCode, because we cannot create two separate
// listeners that are somehow equal to each other.
abstract class PingListener<C : Connection?> // implements GenericFutureListener<Future<PingTuple<C>>>
{
// @Override
// public
// void operationComplete(Future<PingTuple<C>> future) throws Exception {
// PingTuple<C> pingTuple = future.get();
// response(pingTuple.connection, pingTuple.responseTime);
// }
/**
* Called when the ping response has been received.
*/
abstract fun response(connection: C, pingResponseTime: Int)
override fun toString(): String {
return "PingListener"
}
}

View File

@ -20,73 +20,55 @@
package dorkbox.network.ping
import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.handshake.RandomIdAllocator
import dorkbox.network.rmi.ResponseManager
import dorkbox.network.rmi.RmiUtils
import kotlinx.coroutines.CoroutineScope
import mu.KLogger
/**
*
* How to handle ping messages
*/
class PingManager<CONNECTION : Connection>(logger: KLogger, actionDispatch: CoroutineScope) {
/**
* allocates ID's for use when pinging a remote endpoint
*/
internal val pingIdAllocator = RandomIdAllocator(Integer.MIN_VALUE, Integer.MAX_VALUE)
internal class PingManager<CONNECTION : Connection> {
@Suppress("UNCHECKED_CAST")
suspend fun manage(connection: CONNECTION, responseManager: ResponseManager, message: Ping) {
if (message.pongTime == 0L) {
message.pongTime = System.currentTimeMillis()
connection.send(message)
} else {
message.finishedTime = System.currentTimeMillis()
internal val responseManager = ResponseManager(logger, actionDispatch)
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
// process the ping message so that our ping callback does something
// this will be null if the ping took longer than 30 seconds and was cancelled
val result = responseManager.getWaiterCallback(rmiId) as (suspend Ping.() -> Unit)?
if (result != null) {
result(message)
}
}
}
/**
* Updates the ping times for this connection (called when this connection gets a REPLY ping message).
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*
* @return true if the message was successfully sent by aeron
*/
fun updatePingResponse(ping: PingMessage) {
//
// @Volatile
// private var pingFuture: PingFuture? = null
//
// pingFuture?.setSuccess(this, ping)
internal suspend fun ping(connection: Connection, actionDispatch: CoroutineScope, responseManager: ResponseManager, function: suspend Ping.() -> Unit): Boolean {
val id = responseManager.prepWithCallback(function)
val ping = Ping()
ping.packedId = RmiUtils.unpackUnsignedRight(id)
ping.pingTime = System.currentTimeMillis()
// NOTE: the timout MUST NOT be more than the max SHORT value!
// ALWAYS cancel the ping after 30 seconds
responseManager.cancelRequest(actionDispatch, 30_000L, id) {
// kill the callback, since we are now "cancelled". If there is a race here (and the response comes at the exact same time)
// we don't care since either it will be null or it won't (if it's not null, it will run the callback)
result = null
}
return connection.send(ping)
}
suspend fun manage(endPoint: EndPoint<CONNECTION>, connection: CONNECTION, message: PingMessage, logger: KLogger) {
// if (message.isReply) {
// connection.updatePingResponse(message)
// } else {
// // return the ping from whence it came
// message.isReply = true
// connection.send(message)
// }
}
suspend fun ping(function1: Connection, function: suspend Ping.() -> Unit): Boolean {
// val ping = PingMessage()
// ping.id = pingIdAllocator.allocate()
//
//
// pingFuture = PingFuture()
//
//// function: suspend (CONNECTION) -> Unit
// // TODO: USE AERON FOR THIS
//// val pingFuture2 = pingFuture
//// if (pingFuture2 != null && !pingFuture2.isSuccess) {
//// pingFuture2.cancel()
//// }
//// val newPromise: Promise<PingTuple<out Connection?>>
//// newPromise = if (channelWrapper.udp() != null) {
//// channelWrapper.udp()
//// .newPromise()
//// } else {
//// channelWrapper.tcp()
//// .newPromise()
//// }
// pingFuture = PingFuture()
//// val ping = PingMessage()
//// ping.id = pingFuture!!.id
//// ping0(ping)
//// return pingFuture!!
// TODO()
return false
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.ping
/**
* Internal message to determine round trip time.
*/
class PingMessage {
var id: Int = 0
var isReply: Boolean = false
}

View File

@ -1,20 +0,0 @@
/*
* Copyright 2020 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.ping
import dorkbox.network.connection.Connection
class PingTuple<C : Connection?>(var connection: C, var responseTime: Int)

View File

@ -15,7 +15,6 @@
*/
package dorkbox.network.rmi
import dorkbox.network.rmi.messages.MethodResponse
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
@ -32,7 +31,7 @@ import kotlin.concurrent.write
*
* response ID's and the memory they hold will leak if the response never arrives!
*/
class ResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
internal class ResponseManager(private val logger: KLogger, actionDispatch: CoroutineScope) {
companion object {
val TIMEOUT_EXCEPTION = Exception()
}
@ -77,20 +76,19 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C
}
} catch (e: ClosedSendChannelException) {
// this can happen if we are starting/stopping an endpoint (and thus a response-manager) VERY quickly, and can be ignored
logger.trace("Error during RMI preparation. Usually this is caused by fast a start/stop")
logger.trace("Error during RMI preparation. Usually this is caused by fast a start-then-stop")
}
}
}
/**
* Called when we receive the answer for our initial request. If no response data, then the pending rmi data entry is deleted
*
* resume any pending remote object method invocations (if they are not async, or not manually waiting)
* NOTE: async RMI will never call this (because async doesn't return a response)
*/
suspend fun onRmiMessage(message: MethodResponse) {
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
suspend fun notifyWaiter(rmiId: Int, result: Any?) {
logger.trace { "RMI return message: $rmiId" }
val previous = pendingLock.write {
@ -108,12 +106,40 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C
}
}
/**
* Called when we receive the answer for our initial request. If no response data, then the pending rmi data entry is deleted
*
* This is ONLY called when we want to get the data out of the stored entry, because we are operating ASYNC. (pure RMI async is different)
*/
suspend fun <T> getWaiterCallback(rmiId: Int): T? {
logger.trace { "RMI return message: $rmiId" }
val previous = pendingLock.write {
val previous = pending[rmiId]
pending[rmiId] = null
previous
}
@Suppress("UNCHECKED_CAST")
if (previous is ResponseWaiter) {
val result = previous.result
// always return this to the cache!
waiterCache.send(previous)
rmiWaitersInUse.getAndDecrement()
return result as T
}
return null
}
/**
* gets the ResponseWaiter (id + waiter) and prepares the pending response map
*
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
*/
internal suspend fun prep(): ResponseWaiter {
suspend fun prep(): ResponseWaiter {
val responseRmi = waiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
@ -129,6 +155,51 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C
return responseRmi
}
/**
* gets the ResponseWaiter (id + waiter) and prepares the pending response map
*
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
*/
suspend fun prepWithCallback(function: Any): Int {
val responseRmi = waiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep()
// assign the callback that will be notified when the return message is received
responseRmi.result = function
pendingLock.write {
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
}
return responseRmi.id
}
/**
* Cancels the RMI request in the given timeout, the callback is executed inside the read lock
*/
fun cancelRequest(actionDispatch: CoroutineScope, timeoutMillis: Long, rmiId: Int, onCancelled: ResponseWaiter.() -> Unit) {
actionDispatch.launch {
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
pendingLock.read {
val maybeResult = pending[rmiId]
if (maybeResult is ResponseWaiter) {
logger.trace { "RMI timeout ($timeoutMillis) with callback cancel: $rmiId" }
maybeResult.cancel()
onCancelled(maybeResult)
}
}
}
}
/**
* We only wait for a reply if we are SYNC.
*
@ -136,7 +207,7 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C
*
* @return the result (can be null) or timeout exception
*/
suspend fun waitForReply(responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? {
suspend fun waitForReply(actionDispatch: CoroutineScope, responseWaiter: ResponseWaiter, timeoutMillis: Long): Any? {
val rmiId = RmiUtils.unpackUnsignedRight(responseWaiter.id) // this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
logger.trace {
@ -181,6 +252,7 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C
}
// deletes the entry in the map
val resultOrWaiter = pendingLock.write {
val previous = pending[rmiId]
pending[rmiId] = null
@ -200,67 +272,6 @@ class ResponseManager(private val logger: KLogger, private val actionDispatch: C
return resultOrWaiter
}
///////
///////
//// Callback waiting/execution. This is part of this class only because we want to use the SAME RMI SYSTEM for other things,
//// namely, being able to have an easier time setting up message responses
///////
///////
/**
* on response, runs the waiting callback
* NOTE: This uses the RMI-ID mechanism to know what the response ID is (the name is left alone)
*/
suspend fun onCallbackMessage(message: MethodResponse) {
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
logger.trace { "RMI return callback: $rmiId" }
val previous = pendingLock.write {
val previous = pending[rmiId]
pending[rmiId] = result
previous
}
// if NULL, since either we don't exist (because we were async), or it was cancelled
if (previous is ResponseWaiter) {
logger.trace { "RMI valid-cancel onCallback: $rmiId" }
// this means we were NOT timed out! (we cannot be timed out here)
previous.doNotify()
}
}
/**
* gets the ResponseWaiter (id + waiter) and prepares the pending response map
*
* We ONLY care about the ID to get the correct response info.
* NOTE: This uses the RMI-ID mechanism to know what the response ID is (the name is left alone)
*/
internal suspend fun prepForCallback(callback: (Any) -> Unit): ResponseWaiter {
val responseRmi = waiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI2 count: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep()
// assign the callback that will be notified when the return message is received
responseRmi.result = callback
pendingLock.write {
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
}
return responseRmi
}
suspend fun close() {
// wait for responses, or wait for timeouts!
while (rmiWaitersInUse.value > 0) {

View File

@ -30,7 +30,7 @@ data class ResponseWaiter(val id: Int) {
var channel: Channel<Unit> = Channel(Channel.RENDEZVOUS)
var isCancelled = false
// holds the RMI result or callback. This is ALWAYS accessed from within a lock!
// holds the RMI result or callback. This is ALWAYS accessed from within a lock (so no synchronize/volatile/etc necessary)!
var result: Any? = null
/**

View File

@ -17,17 +17,15 @@ package dorkbox.network.rmi
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.messages.MethodRequest
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.util.*
import kotlin.coroutines.*
object MyUnconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) = block.run() // !!!
}
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
* Handles network communication when methods are invoked on a proxy.
@ -78,7 +76,7 @@ internal class RmiClient(val isGlobal: Boolean,
private var enableEquals = false
// if we are ASYNC, then this method immediately returns
private suspend fun sendRequest(invokeMethod: MethodRequest): Any? {
private suspend fun sendRequest(actionDispatch: CoroutineScope, invokeMethod: MethodRequest): Any? {
// there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods --
// the "server" side can have out-of-order method invocation. There are 2 ways to solve this
// 1) make the "server" side single threaded
@ -96,13 +94,12 @@ internal class RmiClient(val isGlobal: Boolean,
invokeMethod.isGlobal = isGlobal
if (isAsync) {
// If we are async, we ignore the response....
return if (isAsync) {
// If we are async, we ignore the response (don't invoke the response manager at all)....
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, RemoteObjectStorage.ASYNC_RMI)
connection.send(invokeMethod)
return null
null
} else {
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep
val rmiWaiter = responseManager.prep()
@ -110,7 +107,7 @@ internal class RmiClient(val isGlobal: Boolean,
connection.send(invokeMethod)
return responseManager.waitForReply(rmiWaiter, timeoutMillis)
responseManager.waitForReply(actionDispatch, rmiWaiter, timeoutMillis)
}
}
@ -220,7 +217,7 @@ internal class RmiClient(val isGlobal: Boolean,
val continuation = suspendCoroutineArg as Continuation<Any?>
val suspendFunction: suspend () -> Any? = {
sendRequest(invokeMethod)
sendRequest(connection.endPoint.actionDispatch, invokeMethod)
}
// function suspension works differently !!
@ -248,7 +245,7 @@ internal class RmiClient(val isGlobal: Boolean,
})
} else {
val any = runBlocking {
sendRequest(invokeMethod)
sendRequest(connection.endPoint.actionDispatch, invokeMethod)
}
when (any) {
ResponseManager.TIMEOUT_EXCEPTION -> {

View File

@ -270,7 +270,10 @@ internal class RmiManagerGlobal<CONNECTION: Connection>(private val logger: KLog
is MethodResponse -> {
// notify the pending proxy requests that we have a response!
actionDispatch.launch {
endPoint.responseManager.onRmiMessage(message)
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
endPoint.responseManager.notifyWaiter(rmiId, result)
}
}
}

View File

@ -23,6 +23,7 @@ import com.esotericsoftware.minlog.Log
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.handshake.HandshakeMessage
import dorkbox.network.ping.Ping
import dorkbox.network.rmi.CachedMethod
import dorkbox.network.rmi.RmiUtils
import dorkbox.network.rmi.messages.*
@ -334,6 +335,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
kryo.register(MethodRequest::class.java, methodRequestSerializer)
kryo.register(MethodResponse::class.java, methodResponseSerializer)
kryo.register(Ping::class.java)
@Suppress("UNCHECKED_CAST")
kryo.register(InvocationHandler::class.java as Class<Any>, rmiClientSerializer)
@ -377,6 +380,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
kryo.register(MethodRequest::class.java, methodRequestSerializer)
kryo.register(MethodResponse::class.java, methodResponseSerializer)
kryo.register(Ping::class.java)
@Suppress("UNCHECKED_CAST")
kryo.register(InvocationHandler::class.java as Class<Any>, rmiClientSerializer)
@ -420,7 +425,8 @@ open class Serialization<CONNECTION: Connection>(private val references: Boolean
// we have to allow CUSTOM classes to register (where the order does not matter), so that if the CLIENT is the RMI-SERVER, it can
// specify IMPL classes for RMI.
classesToRegister.forEach { registration ->
require(registration is ClassRegistrationForRmi) { "Unable to register a *class* by itself. This is only permitted for RMI. To fix this, remove xx.register(${registration.clazz.name})" }
require(registration is ClassRegistrationForRmi) { "Unable to register a *class* by itself. This is only permitted on the CLIENT for RMI. " +
"To fix this, remove xx.register(${registration.clazz.name})" }
}
@Suppress("UNCHECKED_CAST")

View File

@ -98,7 +98,7 @@ class PingPongTest : BaseTest() {
}
val counter = AtomicInteger(0)
client.onMessage<Data> { message ->
client.onMessage<Data> { _ ->
if (counter.getAndIncrement() <= tries) {
send(data)
} else {

View File

@ -1,59 +1,168 @@
package dorkboxTest.network
import dorkbox.network.Client
import dorkbox.network.Server
import dorkbox.network.connection.Connection
import dorkbox.network.ping.Ping
import dorkbox.network.rmi.RmiUtils
import kotlinx.atomicfu.atomic
import org.junit.Assert
import org.junit.Test
class PingTest : BaseTest() {
@Test
fun onServerPing() {
// val serverSuccess = atomic(false)
// val clientSuccess = atomic(false)
//
// run {
// val configuration = serverConfig()
//
// val server: Server<Connection> = Server(configuration)
// addEndPoint(server)
// server.bind()
//
// server.onPing { ping ->
// serverSuccess.value = true
// println("Ping info ${ping.time}")
// close()
// }
// }
//
// run {
// val config = clientConfig()
//
// val client: Client<Connection> = Client(config)
// addEndPoint(client)
//
// client.onConnect {
// clientSuccess.value = true
//
// it.ping {
// println("received ping back! Val: $time")
// }
// }
//
// client.onDisconnect {
// stopEndPoints()
// }
//
// runBlocking {
// try {
// client.connect(LOOPBACK)
// } catch (e: Exception) {
// stopEndPoints()
// throw e
// }
// }
// }
//
//
// waitForThreads()
//
// Assert.assertTrue(serverSuccess.value)
// Assert.assertTrue(clientSuccess.value)
fun RmiPing() {
val clientSuccess = atomic(false)
run {
val configuration = serverConfig()
val server: Server<Connection> = Server(configuration)
addEndPoint(server)
server.bind()
}
run {
val config = clientConfig()
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onConnect {
ping {
clientSuccess.value = true
logger.error("out-bound: $outbound")
logger.error("in-bound: $inbound")
logger.error("round-trip: $roundtrip")
stopEndPoints()
}
}
client.connect(LOOPBACK)
}
waitForThreads()
Assert.assertTrue(clientSuccess.value)
}
@Test
fun MessagePing() {
val serverSuccess = atomic(false)
val clientSuccess = atomic(false)
run {
val configuration = serverConfig()
configuration.serialization.register(PingMessage::class.java)
val server: Server<Connection> = Server(configuration)
addEndPoint(server)
server.bind()
server.onMessage<PingMessage> { ping ->
serverSuccess.value = true
ping.pongTime = System.currentTimeMillis()
send(ping)
}
}
run {
val config = clientConfig()
val client: Client<Connection> = Client(config)
addEndPoint(client)
client.onMessage<PingMessage> { ping ->
clientSuccess.value = true
ping.finishedTime = System.currentTimeMillis()
logger.error("out-bound: ${ping.outbound}")
logger.error("in-bound: ${ping.inbound}")
logger.error("round-trip: ${ping.roundtrip}")
stopEndPoints()
}
client.onConnect {
logger.error("Connecting...")
val ping = PingMessage()
ping.packedId = 1
ping.pingTime = System.currentTimeMillis()
send(ping)
}
client.connect(LOOPBACK)
}
waitForThreads()
Assert.assertTrue(serverSuccess.value)
Assert.assertTrue(clientSuccess.value)
}
class PingMessage {
var packedId = 0
var pingTime = 0L
var pongTime = 0L
@Transient
var finishedTime = 0L
/**
* The time it took for the remote connection to return the ping to us. This is only accurate if the clocks are synchronized
*/
val inbound: Long
get() {
return finishedTime - pongTime
}
/**
* The time it took for us to ping the remote connection. This is only accurate if the clocks are synchronized.
*/
val outbound: Long
get() {
return pongTime - pingTime
}
/**
* The round-trip time it took to ping the remote connection
*/
val roundtrip: Long
get() {
return finishedTime - pingTime
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as Ping
if (packedId != other.packedId) return false
if (pingTime != other.pingTime) return false
if (pongTime != other.pongTime) return false
if (finishedTime != other.finishedTime) return false
return true
}
override fun hashCode(): Int {
var result = packedId
result = 31 * result + pingTime.hashCode()
result = 31 * result + pongTime.hashCode()
result = 31 * result + finishedTime.hashCode()
return result
}
override fun toString(): String {
return "PingMessage ${RmiUtils.unpackUnsignedRight(packedId)} (pingTime=$pingTime, pongTime=$pongTime, finishedTime=$finishedTime)"
}
}
}