various steps to optimize RMI calls (they are ~1.5x as slow as standard message passing

This commit is contained in:
Robinson 2023-09-07 18:08:22 +02:00
parent 56a42e5b7f
commit fa03be5e89
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 103 additions and 68 deletions

View File

@ -180,7 +180,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
*
* @return the result (can be null) or timeout exception
*/
fun waitForReply(
fun getReply(
responseWaiter: ResponseWaiter,
timeoutMillis: Long,
logger: KLogger,
@ -188,33 +188,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
): Any? {
val id = RmiUtils.unpackUnsignedRight(responseWaiter.id)
logger.trace { "[RM] wait: $id" }
// NOTE: we ALWAYS send a response from the remote end (except when async).
//
// 'async' -> DO NOT WAIT (no response)
// 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER
if (timeoutMillis > 0) {
// wait for the response.
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will wait for:
// A) get response
// B) timeout
responseWaiter.doWait(timeoutMillis)
// if we timeout, it doesn't matter since we'll be removing the waiter from the array anyways,
// so no signal can occur, or a signal won't matter
} else {
// wait for the response --- THIS WAITS FOREVER (there is no timeout)!
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will wait for one
// A) get response
responseWaiter.doWait()
}
logger.trace { "[RM] get: $id" }
// deletes the entry in the map
val resultOrWaiter = pendingLock.write {

View File

@ -20,6 +20,9 @@ import java.util.concurrent.*
import java.util.concurrent.locks.ReentrantLock
data class ResponseWaiter(val id: Int) {
// @Volatile
// private var latch = dorkbox.util.sync.CountDownLatch(1)
private val lock = ReentrantLock()
private val condition = lock.newCondition()
@ -32,12 +35,14 @@ data class ResponseWaiter(val id: Int) {
*/
fun prep() {
result = null
// latch = dorkbox.util.sync.CountDownLatch(1)
}
/**
* Waits until another thread invokes "doWait"
*/
fun doNotify() {
// latch.countDown()
try {
lock.withLock {
condition.signal()
@ -50,6 +55,7 @@ data class ResponseWaiter(val id: Int) {
* Waits a specific amount of time until another thread invokes "doNotify"
*/
fun doWait() {
// latch.await()
try {
lock.withLock {
condition.await()
@ -62,6 +68,7 @@ data class ResponseWaiter(val id: Int) {
* Waits a specific amount of time until another thread invokes "doNotify"
*/
fun doWait(timeout: Long) {
// latch.await(timeout, TimeUnit.MILLISECONDS)
try {
lock.withLock {
condition.await(timeout, TimeUnit.MILLISECONDS)

View File

@ -19,7 +19,7 @@ import dorkbox.network.connection.Connection
import dorkbox.network.connection.EndPoint
import dorkbox.network.rmi.messages.MethodRequest
import kotlinx.coroutines.asContextElement
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.yield
import mu.KLogger
@ -178,43 +178,6 @@ internal class RmiClient(val isGlobal: Boolean,
@Volatile private var enableHashCode = false
@Volatile private var enableEquals = false
// if we are ASYNC, then this method immediately returns
private fun sendRequest(isAsync: Boolean, invokeMethod: MethodRequest, logger: KLogger): Any? {
// there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods --
// the "server" side can have out-of-order method invocation. There are 2 ways to solve this
// 1) make the "server" side single threaded
// 2) make the "client" side wait for execution response (from the "server"). <--- this is what we are using.
//
// Because we have to ALWAYS make the client wait (unless 'isAsync' is true), we will always be returning, and will always have a
// response (even if it is a void response). This simplifies our response mask, and lets us use more bits for storing the
// response ID
// NOTE: we ALWAYS send a response from the remote end (except when async).
//
// 'async' -> DO NOT WAIT (no response)
// 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER
invokeMethod.isGlobal = isGlobal
return if (isAsync) {
// If we are async, we ignore the response (don't invoke the response manager at all)....
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, RemoteObjectStorage.ASYNC_RMI)
connection.send(invokeMethod)
null
} else {
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep -- unless it is ASYNC)
val rmiWaiter = responseManager.prep(logger)
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, rmiWaiter.id)
connection.send(invokeMethod)
// NOTE: this is blocking!
responseManager.waitForReply(rmiWaiter, timeoutMillis, logger, connection)
}
}
@Suppress("DuplicatedCode", "UNCHECKED_CAST")
/**
* @throws Exception
@ -311,6 +274,43 @@ internal class RmiClient(val isGlobal: Boolean,
// this should be accessed via the KRYO class ID + method index (both are SHORT, and can be packed)
invokeMethod.cachedMethod = cachedMethods.first { it.method == method }
// there is a STRANGE problem, where if we DO NOT respond/reply to method invocation, and immediate invoke multiple methods --
// the "server" side can have out-of-order method invocation. There are 2 ways to solve this
// 1) make the "server" side single threaded
// 2) make the "client" side wait for execution response (from the "server"). <--- this is what we are using.
//
// Because we have to ALWAYS make the client wait (unless 'isAsync' is true), we will always be returning, and will always have a
// response (even if it is a void response). This simplifies our response mask, and lets us use more bits for storing the
// response ID
// NOTE: we ALWAYS send a response from the remote end (except when async).
//
// 'async' -> DO NOT WAIT (no response)
// 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER
invokeMethod.isGlobal = isGlobal
if (localAsync) {
// If we are async, we ignore the response (don't invoke the response manager at all)....
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, RemoteObjectStorage.ASYNC_RMI)
connection.send(invokeMethod)
return null
}
//
// this is all SYNC code
//
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep -- unless it is ASYNC)
val responseWaiter = responseManager.prep(connection.logger)
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, responseWaiter.id)
connection.send(invokeMethod)
// if a 'suspend' function is called, then our last argument is a 'Continuation' object
// We will use this for our coroutine context instead of running on a new coroutine
@ -321,8 +321,36 @@ internal class RmiClient(val isGlobal: Boolean,
val continuation = suspendCoroutineArg as Continuation<Any?>
val suspendFunction: suspend () -> Any? = {
// NOTE: once something `ELSE` is suspending, we can remove the `yield`
yield() // if this is not here, it will not work (something must actually suspend!)
sendRequest(localAsync, invokeMethod, connection.logger)
// NOTE: this is blocking!
// NOTE: we ALWAYS send a response from the remote end (except when async).
//
// 'async' -> DO NOT WAIT (no response)
// 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER
if (timeoutMillis > 0) {
// wait for the response.
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will wait for:
// A) get response
// B) timeout
responseWaiter.doWait(timeoutMillis)
// if we timeout, it doesn't matter since we'll be removing the waiter from the array anyways,
// so no signal can occur, or a signal won't matter
} else {
// wait for the response --- THIS WAITS FOREVER (there is no timeout)!
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will wait for one
// A) get response
responseWaiter.doWait()
}
responseManager.getReply(responseWaiter, timeoutMillis, connection.logger, connection)
}
// function suspension works differently. THIS IS A TRAMPOLINE TO CALL SUSPEND !!
@ -350,7 +378,33 @@ internal class RmiClient(val isGlobal: Boolean,
}
})
} else {
val any = sendRequest(localAsync, invokeMethod, connection.logger)
// NOTE: this is blocking!
// NOTE: we ALWAYS send a response from the remote end (except when async).
//
// 'async' -> DO NOT WAIT (no response)
// 'timeout > 0' -> WAIT w/ TIMEOUT
// 'timeout == 0' -> WAIT FOREVER
if (timeoutMillis > 0) {
// wait for the response.
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will wait for:
// A) get response
// B) timeout
responseWaiter.doWait(timeoutMillis)
// if we timeout, it doesn't matter since we'll be removing the waiter from the array anyways,
// so no signal can occur, or a signal won't matter
} else {
// wait for the response --- THIS WAITS FOREVER (there is no timeout)!
//
// If the response is ALREADY here, the doWait() returns instantly (with result)
// if no response yet, it will wait for one
// A) get response
responseWaiter.doWait()
}
val any = responseManager.getReply(responseWaiter, timeoutMillis, connection.logger, connection)
when (any) {
ResponseManager.TIMEOUT_EXCEPTION -> {
val fancyName = RmiUtils.makeFancyMethodName(method)