From e09fd43e37d4e720dfd828b64b101ca7fca8e571 Mon Sep 17 00:00:00 2001 From: nathan Date: Tue, 1 Sep 2020 20:36:24 +0200 Subject: [PATCH] Added guarantee of aeron publication log deletion (warning if it cannot delete the file) --- src/dorkbox/network/connection/Connection.kt | 21 ++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index a59137a3..3519e448 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -32,6 +32,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.agrona.DirectBuffer +import java.io.IOException import java.util.concurrent.TimeUnit /** @@ -188,7 +189,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) { internal fun pollSubscriptions(): Int { // NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment. // `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)` - return subscription.poll(messageHandler, 2) + return subscription.poll(messageHandler, 1) } /** @@ -303,7 +304,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) { subscription.close() - val closeTimeoutTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(endPoint.config.connectionCloseTimeoutInSeconds.toLong()) + val timeOut = TimeUnit.SECONDS.toMillis(endPoint.config.connectionCloseTimeoutInSeconds.toLong()) + var closeTimeoutTime = System.currentTimeMillis() + timeOut // we do not want to close until AFTER all publications have been sent. Calling this WITHOUT waiting will instantly stop everything // we want a timeout-check, otherwise this will run forever @@ -311,8 +313,23 @@ open class Connection(connectionParameters: ConnectionParams<*>) { delay(100) } + // on close, we want to make sure this file is DELETED! + val logFile = endPoint.mediaDriverContext!!.aeronDirectory().resolve("publications").resolve("${publication.registrationId()}.logbuffer") + publication.close() + + closeTimeoutTime = System.currentTimeMillis() + timeOut + while (logFile.exists() && System.currentTimeMillis() < closeTimeoutTime) { + if (logFile.delete()) { + break + } + } + + if (logFile.exists()) { + listenerManager.value?.notifyError(this, IOException("Unable to delete aeron publication log on close: $logFile")) + } + rmiConnectionSupport.clearProxyObjects() // this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback