From cb7f8b2990bbf09aa479ec1f08d483661c816f5a Mon Sep 17 00:00:00 2001 From: Robinson Date: Mon, 4 Dec 2023 10:47:37 +0100 Subject: [PATCH] Send all buffered messages at once, instead of 1-at-a-time. --- src/dorkbox/network/connection/Connection.kt | 16 +++++---- src/dorkbox/network/connection/EndPoint.kt | 9 +++++ .../connection/buffer/BufferedMessages.kt | 21 ++++++++++++ .../connection/buffer/BufferedSerializer.kt | 34 +++++++++++++++++++ .../network/serialization/Serialization.kt | 4 +++ 5 files changed, 78 insertions(+), 6 deletions(-) create mode 100644 src/dorkbox/network/connection/buffer/BufferedMessages.kt create mode 100644 src/dorkbox/network/connection/buffer/BufferedSerializer.kt diff --git a/src/dorkbox/network/connection/Connection.kt b/src/dorkbox/network/connection/Connection.kt index eac7314e..07d9845a 100644 --- a/src/dorkbox/network/connection/Connection.kt +++ b/src/dorkbox/network/connection/Connection.kt @@ -19,6 +19,7 @@ import dorkbox.network.Client import dorkbox.network.Server import dorkbox.network.aeron.AeronDriver.Companion.sessionIdAllocator import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator +import dorkbox.network.connection.buffer.BufferedMessages import dorkbox.network.connection.buffer.BufferedSession import dorkbox.network.ping.Ping import dorkbox.network.rmi.RmiSupportConnection @@ -344,13 +345,16 @@ open class Connection(connectionParameters: ConnectionParams<*>) { internal fun sendBufferedMessages() { if (enableBufferedMessages) { - // now send all buffered/pending messages - if (logger.isDebugEnabled) { - logger.debug("Sending buffered messages: ${bufferedSession.pendingMessagesQueue.size}") - } + val bufferedMessage = BufferedMessages() + val numberDrained = bufferedSession.pendingMessagesQueue.drainTo(bufferedMessage.messages) - bufferedSession.pendingMessagesQueue.forEach { - sendNoBuffer(it) + if (numberDrained > 0) { + // now send all buffered/pending messages + if (logger.isDebugEnabled) { + logger.debug("Sending buffered messages: ${bufferedSession.pendingMessagesQueue.size}") + } + + sendNoBuffer(bufferedMessage) } } } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index 75e3d290..3a0a1dd6 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -24,6 +24,7 @@ import dorkbox.network.ServerConfiguration import dorkbox.network.aeron.AeronDriver import dorkbox.network.aeron.BacklogStat import dorkbox.network.aeron.EventPoller +import dorkbox.network.connection.buffer.BufferedMessages import dorkbox.network.connection.streaming.StreamingControl import dorkbox.network.connection.streaming.StreamingData import dorkbox.network.connection.streaming.StreamingManager @@ -676,6 +677,14 @@ abstract class EndPoint private constructor(val type: C } } + is BufferedMessages -> { + // this can potentially be an EXTREMELY large set of data -- so when there are buffered messages, it is often better + // to batch-send them instead of one-at-a-time (which can cause excessive CPU load and Network I/O) + message.messages.forEach { + processMessageFromChannel(connection, it) + } + } + // small problem... If we expect IN ORDER messages (ie: setting a value, then later reading the value), multiple threads don't work. // this is worked around by having RMI always return (unless async), even with a null value, so the CALLING side of RMI will always // go in "lock step" diff --git a/src/dorkbox/network/connection/buffer/BufferedMessages.kt b/src/dorkbox/network/connection/buffer/BufferedMessages.kt new file mode 100644 index 00000000..ae5462f6 --- /dev/null +++ b/src/dorkbox/network/connection/buffer/BufferedMessages.kt @@ -0,0 +1,21 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.connection.buffer + +class BufferedMessages { + var messages = arrayListOf() +} diff --git a/src/dorkbox/network/connection/buffer/BufferedSerializer.kt b/src/dorkbox/network/connection/buffer/BufferedSerializer.kt new file mode 100644 index 00000000..7211d3db --- /dev/null +++ b/src/dorkbox/network/connection/buffer/BufferedSerializer.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2023 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dorkbox.network.connection.buffer + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output + +internal class BufferedSerializer: Serializer() { + override fun write(kryo: Kryo, output: Output, messages: BufferedMessages) { + kryo.writeClassAndObject(output, messages.messages) + } + + override fun read(kryo: Kryo, input: Input, type: Class): BufferedMessages { + val messages = BufferedMessages() + messages.messages = kryo.readClassAndObject(input) as ArrayList + return messages + } +} diff --git a/src/dorkbox/network/serialization/Serialization.kt b/src/dorkbox/network/serialization/Serialization.kt index 36406650..07646d64 100644 --- a/src/dorkbox/network/serialization/Serialization.kt +++ b/src/dorkbox/network/serialization/Serialization.kt @@ -26,6 +26,8 @@ import dorkbox.network.Server import dorkbox.network.connection.Connection import dorkbox.network.connection.DisconnectMessage import dorkbox.network.connection.SendSync +import dorkbox.network.connection.buffer.BufferedMessages +import dorkbox.network.connection.buffer.BufferedSerializer import dorkbox.network.connection.streaming.StreamingControl import dorkbox.network.connection.streaming.StreamingControlSerializer import dorkbox.network.connection.streaming.StreamingData @@ -185,6 +187,7 @@ open class Serialization(private val references: Boolean private val pingSerializer = PingSerializer() private val sendSyncSerializer = SendSyncSerializer() private val disconnectSerializer = DisconnectSerializer() + private val bufferedMessageSerializer = BufferedSerializer() internal val fileContentsSerializer = FileContentsSerializer() @@ -438,6 +441,7 @@ open class Serialization(private val references: Boolean kryo.register(SendSync::class.java, sendSyncSerializer) kryo.register(HandshakeMessage::class.java) kryo.register(DisconnectMessage::class.java, disconnectSerializer) + kryo.register(BufferedMessages::class.java, bufferedMessageSerializer) @Suppress("UNCHECKED_CAST")