Renaming RmiResponseManager for a more general use case (ResponseManager)

This commit is contained in:
nathan 2020-09-22 17:04:42 +02:00
parent 8f39813ccf
commit 7d67cc86b7
10 changed files with 100 additions and 32 deletions

View File

@ -30,7 +30,7 @@ import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientRejectedException
import dorkbox.network.exceptions.ClientTimedOutException
import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.other.coroutines.SuspendWaiter
import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.rmi.RemoteObject
import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.rmi.RmiManagerConnections

View File

@ -28,7 +28,7 @@ import dorkbox.network.connection.ListenerManager
import dorkbox.network.connectionType.ConnectionRule
import dorkbox.network.exceptions.ServerException
import dorkbox.network.handshake.ServerHandshake
import dorkbox.network.other.coroutines.SuspendWaiter
import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.rmi.RemoteObject
import dorkbox.network.rmi.RemoteObjectStorage
import dorkbox.network.rmi.TimeoutException

View File

@ -21,11 +21,12 @@ import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronConfig
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.coroutines.SuspendWaiter
import dorkbox.network.exceptions.MessageNotRegisteredException
import dorkbox.network.handshake.HandshakeMessage
import dorkbox.network.ipFilter.IpFilterRule
import dorkbox.network.other.coroutines.SuspendWaiter
import dorkbox.network.ping.PingMessage
import dorkbox.network.rmi.ResponseManager
import dorkbox.network.rmi.RmiManagerConnections
import dorkbox.network.rmi.RmiManagerGlobal
import dorkbox.network.rmi.messages.RmiMessage
@ -103,7 +104,8 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
// we only want one instance of these created. These will be called appropriately
val settingsStore: SettingsStore
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, actionDispatch, config.serialization)
private val responseManager = ResponseManager(logger, actionDispatch)
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, responseManager, config.serialization)
init {
require(!config.previouslyUsed) { "${type.simpleName} configuration cannot be reused!" }

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.other.coroutines;
package dorkbox.network.coroutines;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

View File

@ -1,4 +1,4 @@
package dorkbox.network.other.coroutines
package dorkbox.network.coroutines
import kotlinx.coroutines.channels.Channel

View File

@ -32,7 +32,7 @@ import kotlin.concurrent.write
*
* response ID's and the memory they hold will leak if the response never arrives!
*/
internal class RmiResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
internal class ResponseManager(private val logger: KLogger, private val actionDispatch: CoroutineScope) {
companion object {
val TIMEOUT_EXCEPTION = Exception()
val ASYNC_WAITER = ResponseWaiter(RemoteObjectStorage.ASYNC_RMI) // this is never waited on, we just need this to optimize how we assigned waiters.
@ -47,7 +47,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// 1 is reserved for ASYNC
private val maxValuesInCache = 65535
private val rmiWaitersInUse = atomic(0)
private val rmiWaiterCache = Channel<ResponseWaiter>(maxValuesInCache)
private val waiterCache = Channel<ResponseWaiter>(maxValuesInCache)
private val pendingLock = ReentrantReadWriteLock()
private val pending = arrayOfNulls<Any?>(maxValuesInCache+1) // +1 because it's possible to have the value 65535 in the cache
@ -73,15 +73,17 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// populate the array of randomly assigned ID's + waiters. This can happen in a new thread
actionDispatch.launch {
for (it in ids) {
rmiWaiterCache.offer(ResponseWaiter(it))
waiterCache.offer(ResponseWaiter(it))
}
}
}
// resume any pending remote object method invocations (if they are not async, or not manually waiting)
// async RMI will never get here!
suspend fun onMessage(message: MethodResponse) {
/**
* resume any pending remote object method invocations (if they are not async, or not manually waiting)
* NOTE: async RMI will never call this (because async doesn't return a response)
*/
suspend fun onRmiMessage(message: MethodResponse) {
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
@ -103,7 +105,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
}
/**
* gets the RmiWaiter (id + waiter).
* gets the ResponseWaiter (id + waiter) and prepares the pending response map
*
* We ONLY care about the ID to get the correct response info. If there is no response, the ID can be ignored.
*/
@ -111,7 +113,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
return if (isAsync) {
ASYNC_WAITER
} else {
val responseRmi = rmiWaiterCache.receive()
val responseRmi = waiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
@ -185,7 +187,7 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
}
// always return the waiter to the cache
rmiWaiterCache.send(responseWaiter)
waiterCache.send(responseWaiter)
rmiWaitersInUse.getAndDecrement()
if (resultOrWaiter is ResponseWaiter) {
@ -197,13 +199,81 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
return resultOrWaiter
}
///////
///////
//// Callback waiting/execution. This is part of this class only because we want to use the SAME RMI SYSTEM for other things,
//// namely, being able to have an easier time setting up message responses
///////
///////
/**
* on response, runs the waiting callback
* NOTE: This uses the RMI-ID mechanism to know what the response ID is (the name is left alone)
*/
suspend fun onCallbackMessage(message: MethodResponse) {
val rmiId = RmiUtils.unpackUnsignedRight(message.packedId)
val result = message.result
logger.trace { "RMI return: $rmiId" }
val previous = pendingLock.write {
val previous = pending[rmiId]
pending[rmiId] = result
previous
}
// if NULL, since either we don't exist (because we were async), or it was cancelled
if (previous is ResponseWaiter) {
logger.trace { "RMI valid-cancel: $rmiId" }
// this means we were NOT timed out! (we cannot be timed out here)
previous.doNotify()
}
}
/**
* gets the ResponseWaiter (id + waiter) and prepares the pending response map
*
* We ONLY care about the ID to get the correct response info.
* NOTE: This uses the RMI-ID mechanism to know what the response ID is (the name is left alone)
*/
internal suspend fun prepForCallback(callback: (Any) -> Unit): ResponseWaiter {
val responseRmi = waiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI2 count: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep()
// assign the callback that will be notified when the return message is received
responseRmi.result = callback
pendingLock.write {
// this just does a .toUShort().toInt() conversion. This is cleaner than doing it manually
pending[RmiUtils.unpackUnsignedRight(responseRmi.id)] = responseRmi
}
return responseRmi
}
suspend fun close() {
// wait for responses, or wait for timeouts!
while (rmiWaitersInUse.value > 0) {
delay(100)
}
rmiWaiterCache.close()
waiterCache.close()
pendingLock.write {
pending.forEachIndexed { index, _ ->

View File

@ -30,10 +30,9 @@ internal data class ResponseWaiter(val id: Int) {
var channel: Channel<Unit> = Channel(Channel.RENDEZVOUS)
var isCancelled = false
// holds the RMI result. This is ALWAYS accessed from within a lock!
// holds the RMI result or callback. This is ALWAYS accessed from within a lock!
var result: Any? = null
/**
* this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
*/

View File

@ -50,7 +50,7 @@ internal class RmiClient(val isGlobal: Boolean,
val rmiObjectId: Int,
private val connection: Connection,
private val proxyString: String,
private val responseManager: RmiResponseManager,
private val responseManager: ResponseManager,
private val cachedMethods: Array<CachedMethod>) : InvocationHandler {
companion object {
@ -256,7 +256,7 @@ internal class RmiClient(val isGlobal: Boolean,
return (suspendFunction as Function1<Continuation<Any?>, Any?>).invoke(Continuation(EmptyCoroutineContext) {
val any = it.getOrNull()
when (any) {
RmiResponseManager.TIMEOUT_EXCEPTION -> {
ResponseManager.TIMEOUT_EXCEPTION -> {
val fancyName = RmiUtils.makeFancyMethodName(method)
val exception = TimeoutException("Response timed out: $fancyName")
// from top down, clean up the coroutine stack
@ -279,7 +279,7 @@ internal class RmiClient(val isGlobal: Boolean,
sendRequest(invokeMethod)
}
when (any) {
RmiResponseManager.TIMEOUT_EXCEPTION -> {
ResponseManager.TIMEOUT_EXCEPTION -> {
val fancyName = RmiUtils.makeFancyMethodName(method)
val exception = TimeoutException("Response timed out: $fancyName")
// from top down, clean up the coroutine stack

View File

@ -58,7 +58,7 @@ internal class RmiManagerConnections<CONNECTION: Connection>(logger: KLogger,
proxyObject = RmiManagerGlobal.createProxyObject(false,
connection,
serialization,
rmiGlobalSupport.rmiResponseManager,
rmiGlobalSupport.responseManager,
kryoId,
rmiId,
interfaceClass)

View File

@ -26,15 +26,14 @@ import dorkbox.network.rmi.messages.MethodRequest
import dorkbox.network.rmi.messages.MethodResponse
import dorkbox.network.serialization.Serialization
import dorkbox.util.classes.ClassHelper
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import mu.KLogger
import java.lang.reflect.Proxy
import java.util.*
internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
actionDispatch: CoroutineScope,
internal val serialization: Serialization) : RmiObjectCache(logger) {
internal val responseManager: ResponseManager,
private val serialization: Serialization) : RmiObjectCache(logger) {
companion object {
/**
@ -55,7 +54,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
internal fun createProxyObject(isGlobalObject: Boolean,
connection: Connection,
serialization: Serialization,
responseManager: RmiResponseManager,
responseManager: ResponseManager,
kryoId: Int,
rmiId: Int,
interfaceClass: Class<*>): RemoteObject {
@ -78,8 +77,6 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
}
}
val rmiResponseManager = RmiResponseManager(logger, actionDispatch)
// this is used for all connection specific ones as well.
private val remoteObjectCreationCallbacks = RemoteObjectStorage(logger)
@ -116,7 +113,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
}
suspend fun close() {
rmiResponseManager.close()
responseManager.close()
remoteObjectCreationCallbacks.close()
}
@ -143,7 +140,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
var proxyObject = connection.rmiConnectionSupport.getProxyObject(rmiId)
if (proxyObject == null) {
val kryoId = endPoint.serialization.getKryoIdForRmiClient(interfaceClass)
proxyObject = createProxyObject(isGlobal, connection, serialization, rmiResponseManager, kryoId, rmiId, interfaceClass)
proxyObject = createProxyObject(isGlobal, connection, serialization, responseManager, kryoId, rmiId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(rmiId, proxyObject)
}
@ -170,7 +167,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
// so we can just instantly create the proxy object (or get the cached one). This MUST be an object that is saved for the connection
var proxyObject = connection.rmiConnectionSupport.getProxyObject(objectId)
if (proxyObject == null) {
proxyObject = createProxyObject(true, connection, serialization, rmiResponseManager, kryoId, objectId, interfaceClass)
proxyObject = createProxyObject(true, connection, serialization, responseManager, kryoId, objectId, interfaceClass)
connection.rmiConnectionSupport.saveProxyObject(objectId, proxyObject)
}
@ -346,7 +343,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
}
is MethodResponse -> {
// notify the pending proxy requests that we have a response!
rmiResponseManager.onMessage(message)
responseManager.onRmiMessage(message)
}
}
}