Moved ping() to the connection object

This commit is contained in:
Robinson 2023-09-05 12:57:16 +02:00
parent 769bad6aac
commit d64a4bb1e1
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
3 changed files with 45 additions and 86 deletions

View File

@ -217,8 +217,8 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*
* @return true if the message was successfully sent by aeron
*/
fun ping(pingTimeoutSeconds: Int = endPoint.config.pingTimeoutSeconds, function: Ping.() -> Unit = {}): Boolean {
return endPoint.ping(this, pingTimeoutSeconds, function)
fun ping(function: Ping.() -> Unit = {}): Boolean {
return sendPing(function)
}
/**
@ -417,4 +417,45 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
val other1 = other as Connection
return id == other1.id
}
internal fun receivePing(ping: Ping) {
if (ping.pongTime == 0L) {
// this is on the "remote end".
ping.pongTime = System.currentTimeMillis()
if (!send(ping)) {
logger.error { "Error returning ping: $ping" }
}
} else {
// this is on the "local end" when the response comes back
ping.finishedTime = System.currentTimeMillis()
val rmiId = ping.packedId
// process the ping message so that our ping callback does something
// this will be null if the ping took longer than XXX seconds and was cancelled
val result = EndPoint.responseManager.removeWaiterCallback<Ping.() -> Unit>(rmiId, logger)
if (result != null) {
result(ping)
} else {
logger.error { "Unable to receive ping, there was no waiting response for $ping ($rmiId)" }
}
}
}
internal fun sendPing(function: Ping.() -> Unit): Boolean {
val id = EndPoint.responseManager.prepWithCallback(logger, function)
val ping = Ping()
ping.packedId = id
ping.pingTime = System.currentTimeMillis()
// if there is no ping response EVER, it means that the connection is in a critically BAD state!
// eventually, all the ping replies (or, in our case, the RMI replies that have timed out) will
// become recycled.
// Is it a memory-leak? No, because the memory will **EVENTUALLY** get freed.
return send(ping)
}
}

View File

@ -437,8 +437,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
*
* @return true if the message was successfully sent by aeron
*/
internal fun ping(connection: Connection, pingTimeoutMs: Int, function: Ping.() -> Unit): Boolean {
return pingManager.ping(connection, pingTimeoutMs, responseManager, logger, function)
internal fun ping(connection: Connection, function: Ping.() -> Unit): Boolean {
return connection.sendPing(function)
}
/**

View File

@ -1,82 +0,0 @@
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.ping
import dorkbox.network.connection.Connection
import dorkbox.network.rmi.ResponseManager
import mu.KLogger
import java.util.concurrent.*
/**
* How to handle ping messages
*/
internal class PingManager<CONNECTION : Connection> {
fun manage(connection: CONNECTION, responseManager: ResponseManager, ping: Ping, logger: KLogger) {
if (ping.pongTime == 0L) {
// this is on the server.
ping.pongTime = System.currentTimeMillis()
if (!connection.send(ping)) {
logger.error { "Error returning ping: $ping" }
}
} else {
// this is on the client
ping.finishedTime = System.currentTimeMillis()
val rmiId = ping.packedId
// process the ping message so that our ping callback does something
// this will be null if the ping took longer than XXX seconds and was cancelled
val result = responseManager.getWaiterCallback<Ping.() -> Unit>(rmiId, logger)
if (result != null) {
result(ping)
} else {
logger.error { "Unable to receive ping, there was no waiting response for $ping ($rmiId)" }
}
}
}
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*
* @return true if the message was successfully sent by aeron
*/
internal fun ping(
connection: Connection,
pingTimeoutSeconds: Int,
responseManager: ResponseManager,
logger: KLogger,
function: Ping.() -> Unit
): Boolean {
val id = responseManager.prepWithCallback(logger, function)
val ping = Ping()
ping.packedId = id
ping.pingTime = System.currentTimeMillis()
// ALWAYS cancel the ping after XXX seconds
responseManager.cancelRequest(TimeUnit.SECONDS.toMillis(pingTimeoutSeconds.toLong()), id, logger) {
// kill the callback, since we are now "cancelled". If there is a race here (and the response comes at the exact same time)
// we don't care since either it will be null or it won't (if it's not null, it will run the callback)
result = null
}
return connection.send(ping)
}
}