Fixed memory leak wrt RMI waiters

This commit is contained in:
nathan 2020-09-09 12:19:27 +02:00
parent db992d98a9
commit 06a35ed027
3 changed files with 68 additions and 32 deletions

View File

@ -115,7 +115,7 @@ internal class RmiManagerGlobal<CONNECTION : Connection>(logger: KLogger,
return success as T?
}
fun close() {
suspend fun close() {
rmiResponseManager.close()
remoteObjectCreationCallbacks.close()
}

View File

@ -16,7 +16,9 @@
package dorkbox.network.rmi
import dorkbox.network.rmi.messages.MethodResponse
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@ -44,10 +46,11 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// 0 is reserved for INVALID
// 1 is reserved for ASYNC
private val maxValuesInCache = 65535
private val rmiWaitersInUse = atomic(0)
private val rmiWaiterCache = Channel<RmiWaiter>(maxValuesInCache)
private val pendingLock = ReentrantReadWriteLock()
private val pending = arrayOfNulls<Any>(maxValuesInCache)
private val pending = arrayOfNulls<RmiWaiter>(maxValuesInCache)
init {
// create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint!
@ -67,9 +70,11 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
}
ids.shuffle()
// populate the array of randomly assigned ID's + waiters.
for (it in ids) {
rmiWaiterCache.offer(RmiWaiter(it))
// populate the array of randomly assigned ID's + waiters. This can happen in a new thread
actionDispatch.launch {
for (it in ids) {
rmiWaiterCache.offer(RmiWaiter(it))
}
}
}
@ -82,22 +87,22 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
logger.trace { "RMI return: $rmiId" }
val previous = pendingLock.write {
val prev = pendingLock.write {
val previous = pending[rmiId]
pending[rmiId] = result
previous
if (previous != null) {
// if NULL, since either we don't exist (because we were async), or it was cancelled
logger.trace { "RMI valid-cancel: $rmiId" }
// this means we were NOT timed out!
previous.result = result
previous
} else {
null
}
}
// if NULL, since either we don't exist (because we were async), or it was cancelled
if (previous is RmiWaiter) {
logger.trace { "RMI valid-cancel: $rmiId" }
// this means we were NOT timed out! (we cannot be timed out here)
previous.doNotify()
// since this was the FIRST one to trigger, return it to the cache.
rmiWaiterCache.send(previous)
}
prev?.doNotify()
}
/**
@ -110,6 +115,8 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
ASYNC_WAITER
} else {
val responseRmi = rmiWaiterCache.receive()
rmiWaitersInUse.getAndIncrement()
logger.trace { "RMI count: ${rmiWaitersInUse.value}" }
// this will replace the waiter if it was cancelled (waiters are not valid if cancelled)
responseRmi.prep()
@ -141,12 +148,25 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// 'timeout <= 0' -> same as async (DO NOT WAIT)
val responseTimeoutJob = actionDispatch.launch {
@Suppress("EXPERIMENTAL_API_USAGE")
val responseTimeoutJob = actionDispatch.launch(start = CoroutineStart.UNDISPATCHED) {
// NOTE: UNDISPATCHED means that this coroutine will start when `rmiWaiter.doWait()` is called (the first suspension point)
// we want this behavior INSTEAD OF automatically starting this on a new thread.
delay(timeoutMillis) // this will always wait. if this job is cancelled, this will immediately stop waiting
// check if we have a result or not
val maybeResult = pendingLock.read { pending[rmiId] }
if (maybeResult is RmiWaiter) {
val maybeResult = pendingLock.read {
val prev = pending[rmiId]
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
if (prev!!.result == null) {
prev
} else {
null
}
}
if (maybeResult != null) {
// maybeResult cannot be null, because the only thing that removes it is after the job cancel!
logger.trace { "RMI timeout ($timeoutMillis) cancel: $rmiId" }
maybeResult.cancel()
@ -165,32 +185,45 @@ internal class RmiResponseManager(private val logger: KLogger, private val actio
// B) timeout
rmiWaiter.doWait()
val resultOrWaiter = pendingLock.write {
val previous = pending[rmiId]
pending[rmiId] = null
previous
}
// always cancel the timeout
responseTimeoutJob.cancel()
if (resultOrWaiter is RmiWaiter) {
val pendingResult = pendingLock.write {
// inside a lock because we HAVE to already use a memory fence, might as well reuse it instead of an extra @Volatile
val previous = pending[rmiId]
pending[rmiId] = null
// previous cannot be null, because we are the only thing that removes it!
val prevResult = previous!!.result
previous.result = null
prevResult
}
// always return the waiter to the cache
rmiWaiterCache.send(rmiWaiter)
rmiWaitersInUse.getAndDecrement()
if (pendingResult == null) {
logger.trace { "RMI was canceled ($timeoutMillis): $rmiId" }
// since this was the FIRST one to trigger, return it to the cache.
rmiWaiterCache.send(resultOrWaiter)
return TIMEOUT_EXCEPTION
}
return resultOrWaiter
return pendingResult
}
fun close() {
suspend fun close() {
// wait for responses, or wait for timeouts!
while (rmiWaitersInUse.value > 0) {
delay(100)
}
rmiWaiterCache.close()
pendingLock.write {
pending.forEachIndexed { index, _ ->
pending[index] = null
}
}
rmiWaiterCache.close()
}
}

View File

@ -30,6 +30,9 @@ internal data class RmiWaiter(val id: Int) {
var channel: Channel<Unit> = Channel(0)
var isCancelled = false
// holds the RMI result. This is ALWAYS accessed from within a lock!
var result: Any? = null
/**
* this will replace the waiter if it was cancelled (waiters are not valid if cancelled)