Added guarantee of aeron publication log deletion (warning if it cannot delete the file)

This commit is contained in:
nathan 2020-09-01 20:36:24 +02:00
parent 3ade7f229e
commit e09fd43e37

View File

@ -32,6 +32,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.agrona.DirectBuffer
import java.io.IOException
import java.util.concurrent.TimeUnit
/**
@ -188,7 +189,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
internal fun pollSubscriptions(): Int {
// NOTE: regarding fragment limit size. Repeated calls to '.poll' will reassemble a fragment.
// `.poll(handler, 4)` == `.poll(handler, 2)` + `.poll(handler, 2)`
return subscription.poll(messageHandler, 2)
return subscription.poll(messageHandler, 1)
}
/**
@ -303,7 +304,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
subscription.close()
val closeTimeoutTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(endPoint.config.connectionCloseTimeoutInSeconds.toLong())
val timeOut = TimeUnit.SECONDS.toMillis(endPoint.config.connectionCloseTimeoutInSeconds.toLong())
var closeTimeoutTime = System.currentTimeMillis() + timeOut
// we do not want to close until AFTER all publications have been sent. Calling this WITHOUT waiting will instantly stop everything
// we want a timeout-check, otherwise this will run forever
@ -311,8 +313,23 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
delay(100)
}
// on close, we want to make sure this file is DELETED!
val logFile = endPoint.mediaDriverContext!!.aeronDirectory().resolve("publications").resolve("${publication.registrationId()}.logbuffer")
publication.close()
closeTimeoutTime = System.currentTimeMillis() + timeOut
while (logFile.exists() && System.currentTimeMillis() < closeTimeoutTime) {
if (logFile.delete()) {
break
}
}
if (logFile.exists()) {
listenerManager.value?.notifyError(this, IOException("Unable to delete aeron publication log on close: $logFile"))
}
rmiConnectionSupport.clearProxyObjects()
// this always has to be on a new dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback