diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index 0c568e18..ed9592ca 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -1012,6 +1012,26 @@ open class Client(config: ClientConfiguration = ClientC } } + /** + * Safely sends objects to a destination, where the callback is notified once the remote endpoint has received the message. + * This is to guarantee happens-before, and using this will depend upon APP+NETWORK latency, and is (by design) not as performant as + * sending a regular message! + * + * @return true if the message was sent successfully, false if the connection has been closed + */ + fun send(message: Any, onSuccessCallback: CONNECTION.() -> Unit): Boolean { + val c = connection0 + + return if (c != null) { + @Suppress("UNCHECKED_CAST") + c.send(message, onSuccessCallback as Connection.() -> Unit) + } else { + val exception = TransmitException("Cannot send-sync a message when there is no connection!") + listenerManager.notifyError(exception) + false + } + } + /** * Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection. * @@ -1023,7 +1043,7 @@ open class Client(config: ClientConfiguration = ClientC val c = connection0 if (c != null) { - return super.ping(c, function) + return c.ping(function) } else { val exception = TransmitException("Cannot send a ping when there is no connection!") listenerManager.notifyError(exception) diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index 53ca1541..41c362c9 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -264,6 +264,19 @@ open class Connection(connectionParameters: ConnectionParams<*>) { return send(message, false) } + + /** + * Safely sends objects to a destination, where the callback is notified once the remote endpoint has received the message. + * + * This is to guarantee happens-before, and using this will depend upon APP+NETWORK latency, and is (by design) not as performant as + * sending a regular message! + * + * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown! + */ + fun send(message: Any, onSuccessCallback: Connection.() -> Unit): Boolean { + return sendSync(message, onSuccessCallback) + } + /** * Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection. * @@ -555,6 +568,55 @@ open class Connection(connectionParameters: ConnectionParams<*>) { return id == other1.id } + internal fun receiveSendSync(sendSync: SendSync) { + if (sendSync.message != null) { + // this is on the "remote end". + sendSync.message = null + + if (!send(sendSync)) { + logger.error("Error returning send-sync: $sendSync") + } + } else { + // this is on the "local end" when the response comes back + val responseId = sendSync.id + + // process the ping message so that our ping callback does something + + // this will be null if the ping took longer than XXX seconds and was cancelled + val result = EndPoint.responseManager.removeWaiterCallback Unit>(responseId, logger) + if (result != null) { + result(this) + } else { + logger.error("Unable to receive send-sync, there was no waiting response for $sendSync ($responseId)") + } + } + } + + + /** + * Safely sends objects to a destination, the callback is notified once the remote endpoint has received the message. + * + * This is to guarantee happens-before, and using this will depend upon APP+NETWORK latency, and is (by design) not as performant as + * sending a regular message! + * + * @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown! + */ + private fun sendSync(message: Any, onSuccessCallback: Connection.() -> Unit): Boolean { + val id = EndPoint.responseManager.prepWithCallback(logger, onSuccessCallback) + + val sendSync = SendSync() + sendSync.message = message + sendSync.id = id + + // if there is no ping response EVER, it means that the connection is in a critically BAD state! + // eventually, all the ping replies (or, in our case, the RMI replies that have timed out) will + // become recycled. + // Is it a memory-leak? No, because the memory will **EVENTUALLY** get freed. + + return send(sendSync, false) + } + + internal fun receivePing(ping: Ping) { if (ping.pongTime == 0L) { // this is on the "remote end". @@ -567,21 +629,21 @@ open class Connection(connectionParameters: ConnectionParams<*>) { // this is on the "local end" when the response comes back ping.finishedTime = System.currentTimeMillis() - val rmiId = ping.packedId + val responseId = ping.packedId // process the ping message so that our ping callback does something // this will be null if the ping took longer than XXX seconds and was cancelled - val result = EndPoint.responseManager.removeWaiterCallback Unit>(rmiId, logger) + val result = EndPoint.responseManager.removeWaiterCallback Unit>(responseId, logger) if (result != null) { result(ping) } else { - logger.error("Unable to receive ping, there was no waiting response for $ping ($rmiId)") + logger.error("Unable to receive ping, there was no waiting response for $ping ($responseId)") } } } - internal fun sendPing(function: Ping.() -> Unit): Boolean { + private fun sendPing(function: Ping.() -> Unit): Boolean { val id = EndPoint.responseManager.prepWithCallback(logger, function) val ping = Ping() diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 2fc8a6a1..8582bc0e 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -492,15 +492,6 @@ abstract class EndPoint private constructor(val type: C listenerManager.onMessage(function) } - /** - * Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection. - * - * @return true if the message was successfully sent by aeron - */ - internal fun ping(connection: Connection, function: Ping.() -> Unit): Boolean { - return connection.sendPing(function) - } - /** * This is designed to permit modifying/overriding how data is processed on the network. * @@ -682,7 +673,7 @@ abstract class EndPoint private constructor(val type: C try { connection.receivePing(message) } catch (e: Exception) { - listenerManager.notifyError(connection, PingException("Error while processing Ping message", e)) + listenerManager.notifyError(connection, PingException("Error while processing Ping message: $message", e)) } } @@ -699,6 +690,33 @@ abstract class EndPoint private constructor(val type: C } } + is SendSync -> { + // SendSync enables us to NOTIFY the remote endpoint that we have received the message. This is to guarantee happens-before! + // Using this will depend upon APP+NETWORK latency, and is (by design) not as performant as sending a regular message! + try { + val message2 = message.message + if (message2 != null) { + // this is on the "remote end". Make sure to dispatch/notify the message BEFORE we send a message back! + try { + var hasListeners = listenerManager.notifyOnMessage(connection, message2) + + // each connection registers, and is polled INDEPENDENTLY for messages. + hasListeners = hasListeners or connection.notifyOnMessage(message2) + + if (!hasListeners) { + listenerManager.notifyError(connection, MessageDispatchException("No send-sync message callbacks found for ${message2::class.java.name}")) + } + } catch (e: Exception) { + listenerManager.notifyError(connection, MessageDispatchException("Error processing send-sync message ${message2::class.java.name}", e)) + } + } + + connection.receiveSendSync(message) + } catch (e: Exception) { + listenerManager.notifyError(connection, SendSyncException("Error while processing send-sync message: $message", e)) + } + } + else -> { try { var hasListeners = listenerManager.notifyOnMessage(connection, message) diff --git a/src/dorkbox/network/connection/SendSync.kt b/src/dorkbox/network/connection/SendSync.kt new file mode 100644 index 00000000..1a5e64fd --- /dev/null +++ b/src/dorkbox/network/connection/SendSync.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.connection + +import dorkbox.network.rmi.RmiUtils + +class SendSync { + var message: Any? = null + + // used to notify the remote endpoint that the message has been processed + var id: Int = 0 + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is SendSync) return false + + if (message != other.message) return false + if (id != other.id) return false + + return true + } + + override fun hashCode(): Int { + var result = message?.hashCode() ?: 0 + result = 31 * result + id + return result + } + + override fun toString(): String { + return "SendSync ${RmiUtils.unpackUnsignedRight(id)} (message=$message)" + } +} diff --git a/src/dorkbox/network/exceptions/SendSyncException.kt b/src/dorkbox/network/exceptions/SendSyncException.kt new file mode 100644 index 00000000..69ccef85 --- /dev/null +++ b/src/dorkbox/network/exceptions/SendSyncException.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.exceptions + +/** + * The type of exceptions raised for send-sync errors + */ +open class SendSyncException : Exception { + /** + * Create an exception. + * + * @param message The message + */ + constructor(message: String) : super(message) + + /** + * Create an exception. + * + * @param cause The cause + */ + constructor(cause: Throwable) : super(cause) + + /** + * Create an exception. + * + * @param message The message + * @param cause The cause + */ + constructor(message: String, cause: Throwable?) : super(message, cause) +} diff --git a/src/dorkbox/network/rmi/ResponseManager.kt b/src/dorkbox/network/rmi/ResponseManager.kt index 3da0d6d3..87c27d37 100644 --- a/src/dorkbox/network/rmi/ResponseManager.kt +++ b/src/dorkbox/network/rmi/ResponseManager.kt @@ -31,7 +31,7 @@ import kotlin.concurrent.write * - these are just looped around in a ring buffer. * - these are stored here as int, however these are REALLY shorts and are int-packed when transferring data on the wire * - * (By default, for RMI...) + * (By default, for RMI/Ping/SendSync...) * - 0 is reserved for INVALID * - 1 is reserved for ASYNC (the response will never be sent back, and we don't wait for it) * @@ -41,7 +41,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int val TIMEOUT_EXCEPTION = TimeoutException().apply { stackTrace = arrayOf() } } - private val rmiWaitersInUse = atomic(0) + private val responseWaitersInUse = atomic(0) private val waiterCache: Pool private val pendingLock = ReentrantReadWriteLock() @@ -49,7 +49,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int init { require(maxValuesInCache <= 65535) { "The maximum size for the values in the response manager is 65535"} - require(maxValuesInCache > minimumValue) { "< $minimumValue (0 and 1 for RMI) are reserved"} + require(maxValuesInCache > minimumValue) { "< $minimumValue (0 and 1 for RMI/Ping/SendSync) are reserved"} require(minimumValue > 1) { "The minimum value $minimumValue must be > 1"} // create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint! @@ -70,6 +70,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int * Called when we receive the answer for our initial request. If no response data, then the pending rmi data entry is deleted * * resume any pending remote object method invocations (if they are not async, or not manually waiting) + * * NOTE: async RMI will never call this (because async doesn't return a response) */ fun notifyWaiter(id: Int, result: Any?, logger: Logger) { @@ -117,7 +118,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int // always return this to the cache! previous.result = null waiterCache.put(previous) - rmiWaitersInUse.getAndDecrement() + responseWaitersInUse.getAndDecrement() return result as T } @@ -132,9 +133,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int */ fun prep(logger: Logger): ResponseWaiter { val waiter = waiterCache.take() - rmiWaitersInUse.getAndIncrement() + responseWaitersInUse.getAndIncrement() if (logger.isTraceEnabled) { - logger.trace("[RM] prep in-use: [${waiter.id}] ${rmiWaitersInUse.value}") + logger.trace("[RM] prep in-use: [${waiter.id}] ${responseWaitersInUse.value}") } // this will initialize the result @@ -154,9 +155,9 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int */ fun prepWithCallback(logger: Logger, function: Any): Int { val waiter = waiterCache.take() - rmiWaitersInUse.getAndIncrement() + responseWaitersInUse.getAndIncrement() if (logger.isTraceEnabled) { - logger.trace("[RM] prep in-use: [${waiter.id}] ${rmiWaitersInUse.value}") + logger.trace("[RM] prep in-use: [${waiter.id}] ${responseWaitersInUse.value}") } // this will initialize the result @@ -199,7 +200,7 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int // always return the waiter to the cache responseWaiter.result = null waiterCache.put(responseWaiter) - rmiWaitersInUse.getAndDecrement() + responseWaitersInUse.getAndDecrement() if (resultOrWaiter is ResponseWaiter) { if (logger.isTraceEnabled) { @@ -228,17 +229,18 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int // always return the waiter to the cache responseWaiter.result = null waiterCache.put(responseWaiter) - rmiWaitersInUse.getAndDecrement() + responseWaitersInUse.getAndDecrement() } + // This is only closed when shutting down the client/server. fun close(logger: Logger) { // technically, this isn't closing it, so much as it's cleaning it out if (logger.isDebugEnabled) { - logger.debug("Closing the response manager for RMI") + logger.debug("Closing the response manager") } // wait for responses, or wait for timeouts! - while (rmiWaitersInUse.value > 0) { + while (responseWaitersInUse.value > 0) { Thread.sleep(50) } diff --git a/src/dorkbox/network/serialization/SendSyncSerializer.kt b/src/dorkbox/network/serialization/SendSyncSerializer.kt new file mode 100644 index 00000000..39480c95 --- /dev/null +++ b/src/dorkbox/network/serialization/SendSyncSerializer.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.serialization + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import dorkbox.network.connection.SendSync + +internal class SendSyncSerializer: Serializer() { + override fun write(kryo: Kryo, output: Output, ssync: SendSync) { + output.writeInt(ssync.id) + kryo.writeClassAndObject(output, ssync.message) + } + + override fun read(kryo: Kryo, input: Input, type: Class): SendSync { + val ssync = SendSync() + ssync.id = input.readInt() + ssync.message = kryo.readClassAndObject(input) + + return ssync + } +} diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index 824d7235..36406650 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -25,6 +25,7 @@ import com.esotericsoftware.minlog.Log import dorkbox.network.Server import dorkbox.network.connection.Connection import dorkbox.network.connection.DisconnectMessage +import dorkbox.network.connection.SendSync import dorkbox.network.connection.streaming.StreamingControl import dorkbox.network.connection.streaming.StreamingControlSerializer import dorkbox.network.connection.streaming.StreamingData @@ -182,6 +183,7 @@ open class Serialization(private val references: Boolean private val streamingControlSerializer = StreamingControlSerializer() private val streamingDataSerializer = StreamingDataSerializer() private val pingSerializer = PingSerializer() + private val sendSyncSerializer = SendSyncSerializer() private val disconnectSerializer = DisconnectSerializer() internal val fileContentsSerializer = FileContentsSerializer() @@ -433,6 +435,7 @@ open class Serialization(private val references: Boolean kryo.register(StreamingData::class.java, streamingDataSerializer) kryo.register(Ping::class.java, pingSerializer) + kryo.register(SendSync::class.java, sendSyncSerializer) kryo.register(HandshakeMessage::class.java) kryo.register(DisconnectMessage::class.java, disconnectSerializer) diff --git a/test/dorkboxTest/network/ErrorLoggerTest.kt b/test/dorkboxTest/network/ErrorLoggerTest.kt index b478483f..139287ce 100644 --- a/test/dorkboxTest/network/ErrorLoggerTest.kt +++ b/test/dorkboxTest/network/ErrorLoggerTest.kt @@ -18,7 +18,6 @@ package dorkboxTest.network import dorkbox.network.Client import dorkbox.network.Server import dorkbox.network.connection.Connection -import kotlinx.coroutines.delay import org.junit.Test class ErrorLoggerTest : BaseTest() { diff --git a/test/dorkboxTest/network/SendSyncTest.kt b/test/dorkboxTest/network/SendSyncTest.kt new file mode 100644 index 00000000..d60f2fde --- /dev/null +++ b/test/dorkboxTest/network/SendSyncTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkboxTest.network + +import dorkbox.network.Client +import dorkbox.network.Server +import dorkbox.network.connection.Connection +import kotlinx.atomicfu.atomic +import org.junit.Assert +import org.junit.Test + +class SendSyncTest : BaseTest() { + val counter = atomic(0) + + @Test + fun sendSync() { + // session/stream count errors + val serverSuccess = atomic(false) + val clientSuccess = atomic(false) + + val server = run { + val configuration = serverConfig() + + val server: Server = Server(configuration) + addEndPoint(server) + + server.onMessage { + serverSuccess.value = true + } + + server + } + + val client = run { + val config = clientConfig() + + val client: Client = Client(config) + addEndPoint(client) + + client.onConnect { + repeat(100) { + send("Hi, I'm waiting!") { + // a send-sync object is returned, once the round-trip is complete, and we are notified + val count = counter.getAndIncrement() + if (count == 99) { + clientSuccess.value = true + + stopEndPoints() + } + } + } + } + + client + } + + server.bind(2000) + client.connect(LOCALHOST, 2000) + + waitForThreads() + + Assert.assertTrue(clientSuccess.value) + Assert.assertTrue(serverSuccess.value) + } +}