Added support for sessions
This commit is contained in:
parent
3e9109b4c7
commit
e6b4cbd386
|
@ -29,6 +29,8 @@ import dorkbox.network.connection.*
|
|||
import dorkbox.network.connection.IpInfo.Companion.formatCommonAddress
|
||||
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
|
||||
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTraceInternal
|
||||
import dorkbox.network.connection.session.SessionConnection
|
||||
import dorkbox.network.connection.session.SessionManager
|
||||
import dorkbox.network.exceptions.*
|
||||
import dorkbox.network.handshake.ClientConnectionDriver
|
||||
import dorkbox.network.handshake.ClientHandshake
|
||||
|
@ -139,6 +141,10 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
@Volatile
|
||||
private var connection0: CONNECTION? = null
|
||||
|
||||
|
||||
@Volatile
|
||||
private var pendingMessagesAssigned = false
|
||||
|
||||
private val string0: String by lazy {
|
||||
"EndPoint [Client: ${storage.publicKey.toHexString()}]"
|
||||
}
|
||||
|
@ -695,6 +701,21 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
// is rogue, we do not want to carelessly provide info.
|
||||
|
||||
|
||||
|
||||
// should we queue messages during a reconnect? This is important if the client/server connection is unstable
|
||||
if (!pendingMessagesAssigned) {
|
||||
sessionManager = if (connectionInfo.enableSession) {
|
||||
pendingMessagesAssigned = true
|
||||
SessionManager(config, aeronDriver, connectionInfo.sessionTimeout)
|
||||
}
|
||||
else {
|
||||
pendingMessagesAssigned = true
|
||||
// this is a NO-OP version! We do not want if/else checks for every message!
|
||||
SessionManager.Companion.NoOp(config, aeronDriver)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////
|
||||
//// RMI
|
||||
///////////////
|
||||
|
@ -755,6 +776,12 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
connectionInfo.secretKey
|
||||
))
|
||||
|
||||
|
||||
if (sessionManager.enabled()) {
|
||||
require(newConnection is SessionConnection) { "The new connection does not inherit a SessionConnection, unable to continue. " }
|
||||
}
|
||||
|
||||
|
||||
if (!handshakeConnection.pubSub.isIpc) {
|
||||
// NOTE: Client can ALWAYS connect to the server. The server makes the decision if the client can connect or not.
|
||||
if (logger.isInfoEnabled) {
|
||||
|
@ -789,8 +816,16 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
|
||||
newConnection.setImage()
|
||||
|
||||
// in the specific case of using sessions, we don't want to call 'init' or `connect` for a connection that is resuming a session
|
||||
var newSession = true
|
||||
if (sessionManager.enabled()) {
|
||||
newSession = sessionManager.onInit(newConnection as SessionConnection)
|
||||
}
|
||||
|
||||
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
|
||||
listenerManager.notifyInit(newConnection)
|
||||
if (newSession) {
|
||||
listenerManager.notifyInit(newConnection)
|
||||
}
|
||||
|
||||
|
||||
// if we shutdown/close before the poller starts, we don't want to block forever
|
||||
|
@ -867,7 +902,11 @@ open class Client<CONNECTION : Connection>(config: ClientConfiguration = ClientC
|
|||
}
|
||||
})
|
||||
|
||||
listenerManager.notifyConnect(newConnection)
|
||||
if (newSession) {
|
||||
listenerManager.notifyConnect(newConnection)
|
||||
} else {
|
||||
(newConnection as SessionConnection).sendPendingMessages()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -69,6 +69,31 @@ class ServerConfiguration : dorkbox.network.Configuration() {
|
|||
field = value
|
||||
}
|
||||
|
||||
/**
|
||||
* If a connection is in a temporal state (in the middle of a reconnect), should the messages be saved in a `pending` queue, which will
|
||||
* be sent once the connection (with the client/server) is re-established. After a connection has been disconnected for longer than
|
||||
* `sessionTimeoutSeconds`, then the session will be closed.
|
||||
*
|
||||
* The server will send this configuration to the client
|
||||
*/
|
||||
var enableSessionManagement = false
|
||||
set(value) {
|
||||
require(!contextDefined) { errorMessage }
|
||||
field = value
|
||||
}
|
||||
|
||||
/**
|
||||
* How long before a session times out, after the connection holding the session is disconnected?
|
||||
*
|
||||
* The server will send this configuration to the client
|
||||
*/
|
||||
var sessionTimeoutSeconds = TimeUnit.MINUTES.toSeconds(2)
|
||||
set(value) {
|
||||
require(!contextDefined) { errorMessage }
|
||||
field = value
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Allows the user to change how endpoint settings and public key information are saved.
|
||||
*/
|
||||
|
@ -107,6 +132,8 @@ class ServerConfiguration : dorkbox.network.Configuration() {
|
|||
config.listenIpAddress = listenIpAddress
|
||||
config.maxClientCount = maxClientCount
|
||||
config.maxConnectionsPerIpAddress = maxConnectionsPerIpAddress
|
||||
config.enableSessionManagement = enableSessionManagement
|
||||
config.sessionTimeoutSeconds = sessionTimeoutSeconds
|
||||
config.settingsStore = settingsStore
|
||||
|
||||
super.copy(config)
|
||||
|
@ -122,6 +149,8 @@ class ServerConfiguration : dorkbox.network.Configuration() {
|
|||
if (listenIpAddress != other.listenIpAddress) return false
|
||||
if (maxClientCount != other.maxClientCount) return false
|
||||
if (maxConnectionsPerIpAddress != other.maxConnectionsPerIpAddress) return false
|
||||
if (enableSessionManagement != other.enableSessionManagement) return false
|
||||
if (sessionTimeoutSeconds != other.sessionTimeoutSeconds) return false
|
||||
if (settingsStore != other.settingsStore) return false
|
||||
|
||||
return true
|
||||
|
@ -132,6 +161,8 @@ class ServerConfiguration : dorkbox.network.Configuration() {
|
|||
result = 31 * result + listenIpAddress.hashCode()
|
||||
result = 31 * result + maxClientCount
|
||||
result = 31 * result + maxConnectionsPerIpAddress
|
||||
result = 31 * result + enableSessionManagement.hashCode()
|
||||
result = 31 * result + sessionTimeoutSeconds.hashCode()
|
||||
result = 31 * result + settingsStore.hashCode()
|
||||
return result
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import dorkbox.network.aeron.*
|
|||
import dorkbox.network.connection.*
|
||||
import dorkbox.network.connection.IpInfo.Companion.IpListenType
|
||||
import dorkbox.network.connection.ListenerManager.Companion.cleanStackTrace
|
||||
import dorkbox.network.connection.session.SessionManager
|
||||
import dorkbox.network.connectionType.ConnectionRule
|
||||
import dorkbox.network.exceptions.ServerException
|
||||
import dorkbox.network.handshake.ServerHandshake
|
||||
|
@ -104,6 +105,17 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
|
|||
"EndPoint [Server: ${storage.publicKey.toHexString()}]"
|
||||
}
|
||||
|
||||
init {
|
||||
sessionManager = if (config.enableSessionManagement) {
|
||||
SessionManager(config, aeronDriver, config.sessionTimeoutSeconds)
|
||||
}
|
||||
else {
|
||||
// this is a NO-OP version! We do not want if/else checks for every message!
|
||||
SessionManager.Companion.NoOp(config, aeronDriver)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
final override fun newException(message: String, cause: Throwable?): Throwable {
|
||||
// +2 because we do not want to see the stack for the abstract `newException`
|
||||
val serverException = ServerException(message, cause)
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright 2023 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dorkbox.network
|
||||
|
||||
import dorkbox.network.connection.ConnectionParams
|
||||
import dorkbox.network.connection.session.SessionConnection
|
||||
|
||||
class SessionClient<CONNECTION: SessionConnection>(config: ClientConfiguration = ClientConfiguration(), loggerName: String = Client::class.java.simpleName):
|
||||
Client<CONNECTION>(config, loggerName) {
|
||||
|
||||
override fun newConnection(connectionParameters: ConnectionParams<CONNECTION>): CONNECTION {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return SessionConnection(connectionParameters) as CONNECTION
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright 2023 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dorkbox.network
|
||||
|
||||
import dorkbox.network.connection.ConnectionParams
|
||||
import dorkbox.network.connection.session.SessionConnection
|
||||
|
||||
class SessionServer<CONNECTION: SessionConnection>(config: ServerConfiguration = ServerConfiguration(), loggerName: String = Server::class.java.simpleName):
|
||||
Server<CONNECTION>(config, loggerName) {
|
||||
override fun newConnection(connectionParameters: ConnectionParams<CONNECTION>): CONNECTION {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return SessionConnection(connectionParameters) as CONNECTION
|
||||
}
|
||||
}
|
|
@ -66,7 +66,7 @@ fun ChannelUriStringBuilder.endpoint(isIpv4: Boolean, addressString: String, por
|
|||
/**
|
||||
* Class for managing the Aeron+Media drivers
|
||||
*/
|
||||
class AeronDriver constructor(config: Configuration, val logger: Logger, val endPoint: EndPoint<*>?) {
|
||||
class AeronDriver(config: Configuration, val logger: Logger, val endPoint: EndPoint<*>?) {
|
||||
|
||||
companion object {
|
||||
/**
|
||||
|
@ -869,22 +869,21 @@ class AeronDriver constructor(config: Configuration, val logger: Logger, val end
|
|||
/**
|
||||
* Since the publication is not connected, we weren't able to send data to the remote endpoint.
|
||||
*/
|
||||
val endPoint = endPoint!!
|
||||
if (result == Publication.NOT_CONNECTED) {
|
||||
if (abortEarly) {
|
||||
val exception = endPoint!!.newException(
|
||||
"[${publication.sessionId()}] Unable to send message. (Connection in non-connected state, aborted attempt! ${
|
||||
AeronDriver.errorCodeName(result)
|
||||
})"
|
||||
val exception = endPoint.newException(
|
||||
"[${publication.sessionId()}] Unable to send message. (Connection in non-connected state, aborted attempt! ${errorCodeName(result)})"
|
||||
)
|
||||
listenerManager.notifyError(exception)
|
||||
return false
|
||||
}
|
||||
else if (publication.isConnected) {
|
||||
// more critical error sending the message. we shouldn't retry or anything.
|
||||
val errorMessage = "[${publication.sessionId()}] Error sending message. (Connection in non-connected state longer than linger timeout. ${AeronDriver.errorCodeName(result)})"
|
||||
val errorMessage = "[${publication.sessionId()}] Error sending message. (Connection in non-connected state longer than linger timeout. ${errorCodeName(result)})"
|
||||
|
||||
// either client or server. No other choices. We create an exception, because it's more useful!
|
||||
val exception = endPoint!!.newException(errorMessage)
|
||||
val exception = endPoint.newException(errorMessage)
|
||||
|
||||
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
|
||||
// where we see who is calling "send()"
|
||||
|
@ -897,8 +896,12 @@ class AeronDriver constructor(config: Configuration, val logger: Logger, val end
|
|||
internal.mustRestartDriverOnError = true
|
||||
|
||||
// publication was actually closed or the server was closed, so no bother throwing an error
|
||||
connection.closeImmediately(sendDisconnectMessage = false,
|
||||
notifyDisconnect = true)
|
||||
connection.closeImmediately(
|
||||
sendDisconnectMessage = false,
|
||||
notifyDisconnect = true,
|
||||
closeEverything = false
|
||||
)
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -926,21 +929,22 @@ class AeronDriver constructor(config: Configuration, val logger: Logger, val end
|
|||
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
|
||||
// send the message.
|
||||
|
||||
if (!endPoint!!.shutdownInProgress.value) {
|
||||
if (!endPoint.shutdownInProgress.value && !endPoint.sessionManager.enabled()) {
|
||||
// we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
|
||||
// additionally, if we are managing pending messages, don't show an error (since the message will be queued to send again)
|
||||
val exception = endPoint.newException(
|
||||
"[${publication.sessionId()}] Unable to send message. (Connection is closed, aborted attempt! ${errorCodeName(result)})"
|
||||
)
|
||||
).cleanStackTrace(5)
|
||||
listenerManager.notifyError(exception)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// more critical error sending the message. we shouldn't retry or anything.
|
||||
val errorMessage = "[${publication.sessionId()}] Error sending message. (${AeronDriver.errorCodeName(result)})"
|
||||
val errorMessage = "[${publication.sessionId()}] Error sending message. (${errorCodeName(result)})"
|
||||
|
||||
// either client or server. No other choices. We create an exception, because it's more useful!
|
||||
val exception = endPoint!!.newException(errorMessage)
|
||||
val exception = endPoint.newException(errorMessage)
|
||||
|
||||
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
|
||||
// where we see who is calling "send()"
|
||||
|
@ -990,11 +994,12 @@ class AeronDriver constructor(config: Configuration, val logger: Logger, val end
|
|||
* According to Aeron Docs, Pubs and Subs can "come and go", whatever that means. We just want to make sure that we
|
||||
* don't "loop forever" if a publication is ACTUALLY closed, like on purpose.
|
||||
*/
|
||||
val endPoint = endPoint!!
|
||||
if (result == Publication.NOT_CONNECTED) {
|
||||
if (publication.isConnected) {
|
||||
// more critical error sending the message. we shouldn't retry or anything.
|
||||
// this exception will be a ClientException or a ServerException
|
||||
val exception = endPoint!!.newException(
|
||||
val exception = endPoint.newException(
|
||||
"[$logInfo] Error sending message. (Connection in non-connected state longer than linger timeout. ${errorCodeName(result)})",
|
||||
null
|
||||
)
|
||||
|
@ -1027,10 +1032,13 @@ class AeronDriver constructor(config: Configuration, val logger: Logger, val end
|
|||
}
|
||||
|
||||
if (result == Publication.CLOSED) {
|
||||
// this can happen when we use RMI to close a connection. RMI will (in most cases) ALWAYS send a response when it's
|
||||
// done executing. If the connection is *closed* first (because an RMI method closed it), then we will not be able to
|
||||
// send the message.
|
||||
|
||||
if (!endPoint!!.shutdownInProgress.value) {
|
||||
if (!endPoint.shutdownInProgress.value && !endPoint.sessionManager.enabled()) {
|
||||
// we already know the connection is closed. we closed it (so it doesn't make sense to emit an error about this)
|
||||
|
||||
// additionally, if we are managing pending messages, don't show an error (since the message will be queued to send again)
|
||||
val exception = endPoint.newException(
|
||||
"[${publication.sessionId()}] Unable to send message. (Connection is closed, aborted attempt! ${errorCodeName(result)})"
|
||||
)
|
||||
|
@ -1043,7 +1051,7 @@ class AeronDriver constructor(config: Configuration, val logger: Logger, val end
|
|||
val errorMessage = "[${publication.sessionId()}] Error sending message. (${errorCodeName(result)})"
|
||||
|
||||
// either client or server. No other choices. We create an exception, because it's more useful!
|
||||
val exception = endPoint!!.newException(errorMessage)
|
||||
val exception = endPoint.newException(errorMessage)
|
||||
|
||||
// +3 more because we do not need to see the "internals" for sending messages. The important part of the stack trace is
|
||||
// where we see who is calling "send()"
|
||||
|
|
|
@ -18,6 +18,7 @@ package dorkbox.network.connection
|
|||
import dorkbox.network.Client
|
||||
import dorkbox.network.aeron.AeronDriver.Companion.sessionIdAllocator
|
||||
import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator
|
||||
import dorkbox.network.connection.session.SessionConnection
|
||||
import dorkbox.network.ping.Ping
|
||||
import dorkbox.network.rmi.RmiSupportConnection
|
||||
import io.aeron.Image
|
||||
|
@ -195,7 +196,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
|||
*
|
||||
* @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
|
||||
*/
|
||||
internal fun send(message: Any, abortEarly: Boolean): Boolean {
|
||||
internal open fun send(message: Any, abortEarly: Boolean): Boolean {
|
||||
if (logger.isTraceEnabled) {
|
||||
// The handshake sessionId IS NOT globally unique
|
||||
// don't automatically create the lambda when trace is disabled! Because this uses 'outside' scoped info, it's a new lambda each time!
|
||||
|
@ -316,13 +317,14 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
|||
*/
|
||||
fun close() {
|
||||
close(sendDisconnectMessage = true,
|
||||
notifyDisconnect = true)
|
||||
notifyDisconnect = true,
|
||||
closeEverything = true)
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the connection, and removes all connection specific listeners
|
||||
*/
|
||||
internal fun close(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean) {
|
||||
internal fun close(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean, closeEverything: Boolean) {
|
||||
// there are 2 ways to call close.
|
||||
// MANUALLY
|
||||
// When a connection is disconnected via a timeout/expire.
|
||||
|
@ -331,14 +333,17 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
|||
|
||||
// make sure that EVERYTHING before "close()" runs before we do
|
||||
EventDispatcher.launchSequentially(EventDispatcher.CLOSE) {
|
||||
closeImmediately(sendDisconnectMessage, notifyDisconnect)
|
||||
closeImmediately(
|
||||
sendDisconnectMessage = sendDisconnectMessage,
|
||||
notifyDisconnect = notifyDisconnect,
|
||||
closeEverything = closeEverything)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// connection.close() -> this
|
||||
// endpoint.close() -> connection.close() -> this
|
||||
internal fun closeImmediately(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean) {
|
||||
internal fun closeImmediately(sendDisconnectMessage: Boolean, notifyDisconnect: Boolean, closeEverything: Boolean) {
|
||||
// the server 'handshake' connection info is cleaned up with the disconnect via timeout/expire.
|
||||
if (!isClosed.compareAndSet(expect = false, update = true)) {
|
||||
return
|
||||
|
@ -348,6 +353,11 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
|||
logger.debug("[$toString0] connection closing")
|
||||
}
|
||||
|
||||
// make sure to save off the RMI objects for session management
|
||||
if (!closeEverything && endPoint.sessionManager.enabled()) {
|
||||
endPoint.sessionManager.onDisconnect(this as SessionConnection)
|
||||
}
|
||||
|
||||
// on close, we want to make sure this file is DELETED!
|
||||
endPoint.aeronDriver.close(subscription, toString0)
|
||||
|
||||
|
@ -359,7 +369,11 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
|
|||
}
|
||||
|
||||
// sometimes the remote end has already disconnected, THERE WILL BE ERRORS if this happens (but they are ok)
|
||||
send(DisconnectMessage.INSTANCE, true)
|
||||
if (closeEverything) {
|
||||
send(DisconnectMessage.CLOSE_EVERYTHING, true)
|
||||
} else {
|
||||
send(DisconnectMessage.CLOSE_FOR_SESSION, true)
|
||||
}
|
||||
}
|
||||
|
||||
// on close, we want to make sure this file is DELETED!
|
||||
|
|
|
@ -71,6 +71,7 @@ internal class CryptoManagement(val logger: Logger,
|
|||
val privateKey: XECPrivateKey
|
||||
val publicKey: XECPublicKey
|
||||
|
||||
// These are both 32 bytes long (256 bits)
|
||||
val privateKeyBytes: ByteArray
|
||||
val publicKeyBytes: ByteArray
|
||||
|
||||
|
@ -181,6 +182,8 @@ internal class CryptoManagement(val logger: Logger,
|
|||
val streamIdPub = cryptInput.readInt()
|
||||
val streamIdSub = cryptInput.readInt()
|
||||
val regDetailsSize = cryptInput.readInt()
|
||||
val enableSession = cryptInput.readBoolean()
|
||||
val sessionTimeout = cryptInput.readLong()
|
||||
val regDetails = cryptInput.readBytes(regDetailsSize)
|
||||
|
||||
// now save data off
|
||||
|
@ -190,16 +193,22 @@ internal class CryptoManagement(val logger: Logger,
|
|||
streamIdPub = streamIdPub,
|
||||
streamIdSub = streamIdSub,
|
||||
publicKey = serverPublicKeyBytes,
|
||||
enableSession = enableSession,
|
||||
sessionTimeout = sessionTimeout,
|
||||
kryoRegistrationDetails = regDetails,
|
||||
secretKey = secretKey)
|
||||
}
|
||||
|
||||
// NOTE: ALWAYS CALLED ON THE SAME THREAD! (from the server, mutually exclusive calls to decrypt)
|
||||
fun nocrypt(sessionIdPub: Int,
|
||||
sessionIdSub: Int,
|
||||
streamIdPub: Int,
|
||||
streamIdSub: Int,
|
||||
kryoRegDetails: ByteArray): ByteArray {
|
||||
fun nocrypt(
|
||||
sessionIdPub: Int,
|
||||
sessionIdSub: Int,
|
||||
streamIdPub: Int,
|
||||
streamIdSub: Int,
|
||||
enableSession: Boolean,
|
||||
sessionTimeout: Long,
|
||||
kryoRegDetails: ByteArray
|
||||
): ByteArray {
|
||||
|
||||
return try {
|
||||
// now create the byte array that holds all our data
|
||||
|
@ -209,6 +218,8 @@ internal class CryptoManagement(val logger: Logger,
|
|||
cryptOutput.writeInt(streamIdPub)
|
||||
cryptOutput.writeInt(streamIdSub)
|
||||
cryptOutput.writeInt(kryoRegDetails.size)
|
||||
cryptOutput.writeBoolean(enableSession)
|
||||
cryptOutput.writeLong(sessionTimeout)
|
||||
cryptOutput.writeBytes(kryoRegDetails)
|
||||
|
||||
cryptOutput.toBytes()
|
||||
|
@ -258,6 +269,8 @@ internal class CryptoManagement(val logger: Logger,
|
|||
sessionIdSub: Int,
|
||||
streamIdPub: Int,
|
||||
streamIdSub: Int,
|
||||
enableSession: Boolean,
|
||||
sessionTimeout: Long,
|
||||
kryoRegDetails: ByteArray
|
||||
): ByteArray {
|
||||
|
||||
|
@ -274,6 +287,8 @@ internal class CryptoManagement(val logger: Logger,
|
|||
cryptOutput.writeInt(streamIdPub)
|
||||
cryptOutput.writeInt(streamIdSub)
|
||||
cryptOutput.writeInt(kryoRegDetails.size)
|
||||
cryptOutput.writeBoolean(enableSession)
|
||||
cryptOutput.writeLong(sessionTimeout)
|
||||
cryptOutput.writeBytes(kryoRegDetails)
|
||||
|
||||
return iv + aesCipher.doFinal(cryptOutput.toBytes())
|
||||
|
|
|
@ -24,6 +24,8 @@ import dorkbox.network.ServerConfiguration
|
|||
import dorkbox.network.aeron.AeronDriver
|
||||
import dorkbox.network.aeron.BacklogStat
|
||||
import dorkbox.network.aeron.EventPoller
|
||||
import dorkbox.network.connection.session.SessionConnection
|
||||
import dorkbox.network.connection.session.SessionManager
|
||||
import dorkbox.network.connection.streaming.StreamingControl
|
||||
import dorkbox.network.connection.streaming.StreamingData
|
||||
import dorkbox.network.connection.streaming.StreamingManager
|
||||
|
@ -178,6 +180,9 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
|
||||
private val streamingManager = StreamingManager<CONNECTION>(logger, config)
|
||||
|
||||
internal lateinit var sessionManager: SessionManager<SessionConnection>
|
||||
|
||||
|
||||
/**
|
||||
* The primary machine port that the server will listen for connections on
|
||||
*/
|
||||
|
@ -520,7 +525,7 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
sendIdleStrategy.reset()
|
||||
|
||||
// A kryo instance CANNOT be re-used until after it's buffer is flushed to the network!
|
||||
return try {
|
||||
val success = try {
|
||||
// since ANY thread can call 'send', we have to take kryo instances in a safe way
|
||||
val kryo = serialization.take()
|
||||
try {
|
||||
|
@ -575,6 +580,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
|
||||
false
|
||||
}
|
||||
|
||||
return success
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -618,10 +625,11 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
// IF we get this message in time, then we do not have to wait for the connection to expire before closing it
|
||||
is DisconnectMessage -> {
|
||||
if (logger.isDebugEnabled) {
|
||||
logger.debug("Received disconnect message from $otherTypeName")
|
||||
logger.debug("Received disconnect message from: $otherTypeName")
|
||||
}
|
||||
connection.close(sendDisconnectMessage = false,
|
||||
notifyDisconnect = true)
|
||||
notifyDisconnect = true,
|
||||
closeEverything = message.closeEverything)
|
||||
}
|
||||
|
||||
// streaming message. This is used when the published data is too large for a single Aeron message.
|
||||
|
@ -937,7 +945,8 @@ abstract class EndPoint<CONNECTION : Connection> private constructor(val type: C
|
|||
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
|
||||
connections.forEach {
|
||||
it.closeImmediately(sendDisconnectMessage = true,
|
||||
notifyDisconnect = notifyDisconnect)
|
||||
notifyDisconnect = notifyDisconnect,
|
||||
closeEverything = closeEverything)
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright 2023 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dorkbox.network.connection.session
|
||||
|
||||
import dorkbox.hex.toHexString
|
||||
import dorkbox.network.aeron.AeronDriver
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.rmi.RemoteObject
|
||||
import dorkbox.network.rmi.RmiClient
|
||||
import kotlinx.atomicfu.locks.ReentrantLock
|
||||
import kotlinx.atomicfu.locks.withLock
|
||||
import java.lang.reflect.Proxy
|
||||
import java.util.concurrent.*
|
||||
|
||||
class Session<CONNECTION: Connection>(val aeronDriver: AeronDriver) {
|
||||
// the RMI objects are saved when the connection is removed, and restored BEFORE the connection is initialized, so there are no concerns
|
||||
// regarding the collision of RMI IDs and objects
|
||||
private val lock = ReentrantLock()
|
||||
private var oldProxyObjects: List<RemoteObject<*>>? = null
|
||||
private var oldImplObjects: List<Pair<Int, Any>>? = null
|
||||
|
||||
/**
|
||||
* Only used when configured. Will re-send all missing messages to a connection when a connection re-connects.
|
||||
*/
|
||||
internal val pendingMessagesQueue: LinkedTransferQueue<Any> = LinkedTransferQueue()
|
||||
|
||||
@Volatile
|
||||
private var connection: CONNECTION? = null
|
||||
|
||||
|
||||
fun queueMessage(connection: SessionConnection, message: Any, abortEarly: Boolean): Boolean {
|
||||
val existingConnection = this.connection
|
||||
|
||||
if (existingConnection != null && existingConnection != connection) {
|
||||
// we received a message on an OLD connection (which is no longer connected ---- BUT we have a NEW connection that is connected)
|
||||
val success = existingConnection.send(message, abortEarly)
|
||||
if (success) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!abortEarly) {
|
||||
// this was a "normal" send (instead of the disconnect message).
|
||||
pendingMessagesQueue.put(message)
|
||||
}
|
||||
else if (aeronDriver.internal.mustRestartDriverOnError) {
|
||||
// the only way we get errors, is if the connection is bad OR if we are sending so fast that the connection cannot keep up.
|
||||
|
||||
// don't restart/reconnect -- there was an internal network error
|
||||
pendingMessagesQueue.put(message)
|
||||
}
|
||||
else if (existingConnection == null || !existingConnection.isConnected()) {
|
||||
// there was an issue - the connection should automatically reconnect
|
||||
pendingMessagesQueue.put(message)
|
||||
}
|
||||
|
||||
// we couldn't send the message, but we did queue it if possible
|
||||
return false
|
||||
}
|
||||
|
||||
fun restore(connection: CONNECTION) {
|
||||
connection.logger.error("RESTORING RMI OBJECTS for ${connection.uuid.toHexString()}")
|
||||
this.connection = connection
|
||||
|
||||
lock.withLock {
|
||||
// this is called, even on a brand-new session, so we must have extra checks in place.
|
||||
val rmi = connection.rmi
|
||||
if (oldProxyObjects != null) {
|
||||
rmi.recreateProxyObjects(oldProxyObjects!!)
|
||||
oldProxyObjects = null
|
||||
}
|
||||
if (oldImplObjects != null) {
|
||||
rmi.restoreAllImplObjects(oldImplObjects!!)
|
||||
oldImplObjects = null
|
||||
}
|
||||
}
|
||||
|
||||
// now send all pending messages
|
||||
connection.logger.error("Sending pending messages: ${pendingMessagesQueue.size}")
|
||||
pendingMessagesQueue.forEach {
|
||||
connection.send(it, false)
|
||||
}
|
||||
}
|
||||
|
||||
fun save(connection: CONNECTION) {
|
||||
connection.logger.error("BACKING UP RMI OBJECTS: ${connection.uuid.toHexString()}")
|
||||
|
||||
val allProxyObjects = connection.rmi.getAllProxyObjects()
|
||||
val allImplObjects = connection.rmi.getAllImplObjects()
|
||||
|
||||
allProxyObjects.forEach {
|
||||
val rmiClient = Proxy.getInvocationHandler(it) as RmiClient
|
||||
val rmiId = rmiClient.rmiObjectId
|
||||
|
||||
connection.logger.error("PROXY: $rmiId")
|
||||
}
|
||||
|
||||
allImplObjects.forEach {
|
||||
connection.logger.error("IMPL: ${it.first} : ${it.second.javaClass}")
|
||||
}
|
||||
|
||||
// we want to save all the connection RMI objects, so they can be recreated on connect
|
||||
lock.withLock {
|
||||
oldProxyObjects = allProxyObjects
|
||||
oldImplObjects = allImplObjects
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright 2023 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dorkbox.network.connection.session
|
||||
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.connection.ConnectionParams
|
||||
|
||||
open class SessionConnection(connectionParameters: ConnectionParams<*>): Connection(connectionParameters) {
|
||||
@Volatile
|
||||
internal lateinit var manager: Session<*>
|
||||
|
||||
override fun send(message: Any, abortEarly: Boolean): Boolean {
|
||||
val success = super.send(message, abortEarly)
|
||||
if (!success) {
|
||||
return manager.queueMessage(this, message, abortEarly)
|
||||
}
|
||||
|
||||
return success
|
||||
}
|
||||
|
||||
fun sendPendingMessages() {
|
||||
manager.pendingMessagesQueue.forEach {
|
||||
send(it, false)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Copyright 2023 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dorkbox.network.connection.session
|
||||
|
||||
import dorkbox.bytes.ByteArrayWrapper
|
||||
import dorkbox.collections.LockFreeHashMap
|
||||
import dorkbox.hex.toHexString
|
||||
import dorkbox.network.Configuration
|
||||
import dorkbox.network.aeron.AeronDriver
|
||||
import dorkbox.network.connection.EndPoint
|
||||
import dorkbox.util.Sys
|
||||
import net.jodah.expiringmap.ExpirationPolicy
|
||||
import net.jodah.expiringmap.ExpiringMap
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.util.concurrent.*
|
||||
|
||||
internal open class SessionManager<CONNECTION: SessionConnection>(config: Configuration, val aeronDriver: AeronDriver, sessionTimeout: Long) {
|
||||
|
||||
companion object {
|
||||
private val logger = LoggerFactory.getLogger(SessionManager::class.java.simpleName)
|
||||
|
||||
class NoOp<CONNECTION: SessionConnection>(config: Configuration, aeronDriver: AeronDriver):
|
||||
SessionManager<CONNECTION>(config, aeronDriver, 0L) {
|
||||
|
||||
override fun enabled(): Boolean {
|
||||
return false
|
||||
}
|
||||
|
||||
override fun onInit(connection: CONNECTION): Boolean {
|
||||
// do nothing
|
||||
return true
|
||||
}
|
||||
|
||||
override fun onDisconnect(connection: CONNECTION) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private val sessions = LockFreeHashMap<ByteArrayWrapper, Session<CONNECTION>>()
|
||||
|
||||
|
||||
// note: the expire time here is a 4x longer than the expire time in the client, this way we can adjust for network lag or quick reconnects
|
||||
private val expiringSessions = ExpiringMap.builder()
|
||||
.apply {
|
||||
// connections are extremely difficult to diagnose when the connection timeout is short
|
||||
val timeUnit = if (EndPoint.DEBUG_CONNECTIONS) { TimeUnit.HOURS } else { TimeUnit.NANOSECONDS }
|
||||
|
||||
// we MUST include the publication linger timeout, otherwise we might encounter problems that are NOT REALLY problems
|
||||
this.expiration(TimeUnit.SECONDS.toNanos(config.connectionCloseTimeoutInSeconds.toLong() * 2) + aeronDriver.lingerNs(), timeUnit)
|
||||
}
|
||||
.expirationPolicy(ExpirationPolicy.CREATED)
|
||||
.expirationListener<ByteArrayWrapper, Session<CONNECTION>> { publicKeyWrapped, _ ->
|
||||
// this blocks until it fully runs (which is ok. this is fast)
|
||||
logger.debug("Connection session for ${publicKeyWrapped.bytes.toHexString()} expired.")
|
||||
|
||||
}
|
||||
.build<ByteArrayWrapper, Session<CONNECTION>>()
|
||||
|
||||
|
||||
init {
|
||||
// ignore 0
|
||||
val check = TimeUnit.SECONDS.toNanos(sessionTimeout)
|
||||
val lingerNs = aeronDriver.lingerNs()
|
||||
val required = TimeUnit.SECONDS.toNanos(config.connectionCloseTimeoutInSeconds.toLong())
|
||||
require(check == 0L || check > required + lingerNs) {
|
||||
"The session timeout (${Sys.getTimePretty(check)}) must be longer than the connection close timeout (${Sys.getTimePretty(required)}) + the aeron driver linger timeout (${Sys.getTimePretty(lingerNs)})!"
|
||||
}
|
||||
}
|
||||
|
||||
open fun enabled(): Boolean {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* this must be called when a new connection is created AND when the internal `reconnect` occurs (as a result of a network error)
|
||||
*
|
||||
* @return true if this is a new session, false if it is an existing session
|
||||
*/
|
||||
open fun onInit(connection: CONNECTION): Boolean {
|
||||
val publicKeyWrapped = ByteArrayWrapper.wrap(connection.uuid)
|
||||
|
||||
var isNewSession = false
|
||||
val session = synchronized(sessions) {
|
||||
// always check if we are expiring first...
|
||||
val expiring = expiringSessions.remove(publicKeyWrapped)
|
||||
if (expiring != null) {
|
||||
expiring
|
||||
} else {
|
||||
val existing = sessions[publicKeyWrapped]
|
||||
if (existing != null) {
|
||||
existing
|
||||
} else {
|
||||
isNewSession = true
|
||||
val newSession = Session<CONNECTION>(aeronDriver)
|
||||
sessions[publicKeyWrapped] = newSession
|
||||
newSession
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
connection.manager = session
|
||||
session.restore(connection)
|
||||
|
||||
return isNewSession
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Always called when a connection is disconnected from the network
|
||||
*/
|
||||
open fun onDisconnect(connection: CONNECTION) {
|
||||
val publicKeyWrapped = ByteArrayWrapper.wrap(connection.uuid)
|
||||
|
||||
val session = synchronized(sessions) {
|
||||
val session = sessions.remove(publicKeyWrapped)
|
||||
// we want to expire this session after XYZ time
|
||||
expiringSessions[publicKeyWrapped] = session
|
||||
session
|
||||
}
|
||||
|
||||
|
||||
session!!.save(connection)
|
||||
}
|
||||
}
|
|
@ -23,6 +23,8 @@ internal class ClientConnectionInfo(
|
|||
val streamIdPub: Int,
|
||||
val streamIdSub: Int = 0,
|
||||
val publicKey: ByteArray = ByteArray(0),
|
||||
val enableSession: Boolean,
|
||||
val sessionTimeout: Long,
|
||||
val kryoRegistrationDetails: ByteArray,
|
||||
val secretKey: SecretKeySpec
|
||||
)
|
||||
|
|
|
@ -53,6 +53,7 @@ class RandomId65kAllocator(private val min: Int, max: Int) {
|
|||
maxAssignments = (max - min).coerceIn(1, max65k)
|
||||
|
||||
// create a shuffled list of ID's. This operation is ONLY performed ONE TIME per endpoint!
|
||||
// Boxing the Ints here is OK, because they are boxed in the cache as well (so it doesn't matter).
|
||||
val ids = ArrayList<Int>(maxAssignments)
|
||||
for (id in min until min + maxAssignments) {
|
||||
ids.add(id)
|
||||
|
|
|
@ -21,6 +21,7 @@ import dorkbox.network.aeron.AeronDriver
|
|||
import dorkbox.network.aeron.AeronDriver.Companion.sessionIdAllocator
|
||||
import dorkbox.network.aeron.AeronDriver.Companion.streamIdAllocator
|
||||
import dorkbox.network.connection.*
|
||||
import dorkbox.network.connection.session.SessionConnection
|
||||
import dorkbox.network.exceptions.AllocationException
|
||||
import dorkbox.network.exceptions.ServerHandshakeException
|
||||
import dorkbox.network.exceptions.ServerTimedoutException
|
||||
|
@ -46,8 +47,6 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||
val aeronDriver: AeronDriver
|
||||
) {
|
||||
|
||||
|
||||
|
||||
// note: the expire time here is a LITTLE longer than the expire time in the client, this way we can adjust for network lag if it's close
|
||||
private val pendingConnections = ExpiringMap.builder()
|
||||
.apply {
|
||||
|
@ -123,24 +122,33 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||
|
||||
// check to see if this is a pending connection
|
||||
if (message.state == HandshakeMessage.DONE) {
|
||||
val existingConnection = pendingConnections.remove(message.connectKey)
|
||||
if (existingConnection == null) {
|
||||
val newConnection = pendingConnections.remove(message.connectKey)
|
||||
if (newConnection == null) {
|
||||
listenerManager.notifyError(ServerHandshakeException("[?????] (${message.connectKey}) Error! Pending connection from client was null, and cannot complete handshake!"))
|
||||
return true
|
||||
}
|
||||
|
||||
// Server is the "source", client mirrors the server
|
||||
if (logger.isDebugEnabled) {
|
||||
logger.debug("[${existingConnection}] (${message.connectKey}) Connection done with handshake.")
|
||||
logger.debug("[${newConnection}] (${message.connectKey}) Connection done with handshake.")
|
||||
}
|
||||
|
||||
existingConnection.setImage()
|
||||
newConnection.setImage()
|
||||
|
||||
// in the specific case of using sessions, we don't want to call 'init' or `connect` for a connection that is resuming a session
|
||||
var newSession = true
|
||||
if (server.sessionManager.enabled()) {
|
||||
newSession = server.sessionManager.onInit(newConnection as SessionConnection)
|
||||
}
|
||||
|
||||
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls
|
||||
if (newSession) {
|
||||
listenerManager.notifyInit(newConnection)
|
||||
}
|
||||
|
||||
// before we finish creating the connection, we initialize it (in case there needs to be logic that happens-before `onConnect` calls occur
|
||||
listenerManager.notifyInit(existingConnection)
|
||||
|
||||
// this enables the connection to start polling for messages
|
||||
server.addConnection(existingConnection)
|
||||
server.addConnection(newConnection)
|
||||
|
||||
// now tell the client we are done
|
||||
try {
|
||||
|
@ -148,9 +156,13 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||
logInfo,
|
||||
HandshakeMessage.doneToClient(message.connectKey))
|
||||
|
||||
listenerManager.notifyConnect(existingConnection)
|
||||
if (newSession) {
|
||||
listenerManager.notifyConnect(newConnection)
|
||||
} else {
|
||||
(newConnection as SessionConnection).sendPendingMessages()
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
listenerManager.notifyError(existingConnection, TransmitException("[$existingConnection] Handshake error", e))
|
||||
listenerManager.notifyError(newConnection, TransmitException("[$newConnection] Handshake error", e))
|
||||
}
|
||||
|
||||
return false
|
||||
|
@ -371,11 +383,14 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||
|
||||
// now create the encrypted payload, using no crypto
|
||||
successMessage.registrationData = server.crypto.nocrypt(
|
||||
connectionSessionIdPub,
|
||||
connectionSessionIdSub,
|
||||
connectionStreamIdPub,
|
||||
connectionStreamIdSub,
|
||||
serialization.getKryoRegistrationDetails())
|
||||
sessionIdPub = connectionSessionIdPub,
|
||||
sessionIdSub = connectionSessionIdSub,
|
||||
streamIdPub = connectionStreamIdPub,
|
||||
streamIdSub = connectionStreamIdSub,
|
||||
enableSession = config.enableSessionManagement,
|
||||
sessionTimeout = config.sessionTimeoutSeconds,
|
||||
kryoRegDetails = serialization.getKryoRegistrationDetails()
|
||||
)
|
||||
|
||||
successMessage.publicKey = server.crypto.publicKeyBytes
|
||||
|
||||
|
@ -612,6 +627,8 @@ internal class ServerHandshake<CONNECTION : Connection>(
|
|||
sessionIdSub = connectionSessionIdSub,
|
||||
streamIdPub = connectionStreamIdPub,
|
||||
streamIdSub = connectionStreamIdSub,
|
||||
enableSession = config.enableSessionManagement,
|
||||
sessionTimeout = config.sessionTimeoutSeconds,
|
||||
kryoRegDetails = serialization.getKryoRegistrationDetails()
|
||||
)
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
* Copyright 2023 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -20,7 +20,7 @@ import com.esotericsoftware.kryo.Serializer
|
|||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
|
||||
class PingSerializer: Serializer<Ping>() {
|
||||
internal class PingSerializer: Serializer<Ping>() {
|
||||
override fun write(kryo: Kryo, output: Output, ping: Ping) {
|
||||
output.writeInt(ping.packedId)
|
||||
output.writeLong(ping.pingTime)
|
||||
|
|
|
@ -59,7 +59,7 @@ import kotlin.concurrent.write
|
|||
*
|
||||
* @author Nathan Robinson
|
||||
*/
|
||||
internal class RemoteObjectStorage(val logger: Logger) {
|
||||
class RemoteObjectStorage(val logger: Logger) {
|
||||
|
||||
companion object {
|
||||
const val INVALID_RMI = 0
|
||||
|
@ -77,107 +77,41 @@ internal class RemoteObjectStorage(val logger: Logger) {
|
|||
// 2) specifically request a number
|
||||
// To solve this, we use 3 data structures, because it's also possible to RETURN no-longer needed object ID's (like when a connection closes)
|
||||
private var objectIdCounter: Int = 1
|
||||
private val reservedObjectIds = IntArrayList(1, INVALID_RMI)
|
||||
private val objectIds = IntArrayList(16, INVALID_RMI)
|
||||
|
||||
init {
|
||||
(0..8).forEach { _ ->
|
||||
objectIds.addInt(objectIdCounter++)
|
||||
}
|
||||
}
|
||||
|
||||
private fun validate(objectId: Int) {
|
||||
require(objectId > 0) { "The ID must be greater than 0" }
|
||||
require(objectId <= 65535) { "The ID must be less than 65,535" }
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the next ID or 0 (INVALID_RMI, if it's invalid)
|
||||
*/
|
||||
private fun unsafeNextId(): Int {
|
||||
val id = if (objectIds.size > 0) {
|
||||
objectIds.removeAt(objectIds.size - 1)
|
||||
} else {
|
||||
objectIdCounter++
|
||||
}
|
||||
|
||||
if (objectIdCounter > 65535) {
|
||||
// basically, it's a short (but collections are a LOT easier to deal with if it's an int)
|
||||
val msg = "Max ID size is 65535, because of how we pack the bytes when sending RMI messages. FATAL ERROR! (too many objects)"
|
||||
logger.error(msg)
|
||||
return INVALID_RMI
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the next possible RMI object ID. Either one that is next available, or 0 (INVALID_RMI) if it was invalid
|
||||
*/
|
||||
fun nextId(): Int {
|
||||
idLock.write {
|
||||
var idToReturn = unsafeNextId()
|
||||
while (reservedObjectIds.contains(idToReturn)) {
|
||||
idToReturn = unsafeNextId()
|
||||
}
|
||||
|
||||
return idToReturn
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reserves an ID so that other requests for ID's will never return this ID. The number must be > 0 and < 65535
|
||||
*
|
||||
* Reservations are permanent and it will ALWAYS be reserved! You cannot "un-reserve" an ID.
|
||||
*
|
||||
* If you care about memory and performance, use the ID from "nextId()" instead.
|
||||
*
|
||||
* @return false if this ID was not able to be reserved
|
||||
*/
|
||||
fun reserveId(id: Int): Boolean {
|
||||
validate(id)
|
||||
|
||||
idLock.write {
|
||||
val contains = objectIds.remove(id)
|
||||
if (contains) {
|
||||
// this id is available for us to use (and was temporarily used before)
|
||||
return true
|
||||
}
|
||||
|
||||
if (reservedObjectIds.contains(id)) {
|
||||
// this id is ALREADY used by something else
|
||||
return false
|
||||
}
|
||||
|
||||
if (objectIdCounter < id) {
|
||||
// this id is ALREADY used by something else
|
||||
return false
|
||||
}
|
||||
|
||||
if (objectIdCounter == id) {
|
||||
// we are available via the counter, so make sure the counter increments
|
||||
val id = if (objectIds.size > 0) {
|
||||
objectIds.removeAt(objectIds.size - 1)
|
||||
} else {
|
||||
objectIdCounter++
|
||||
// we still want to mark this as reserved, so fall through
|
||||
}
|
||||
|
||||
// this means that the counter is LARGER than the id (maybe even a LOT larger)
|
||||
// we just stuff this requested number in a small array and check it whenever we get a new number
|
||||
reservedObjectIds.add(id)
|
||||
return true
|
||||
if (objectIdCounter > 65535) {
|
||||
// basically, it's a short (but collections are a LOT easier to deal with if it's an int)
|
||||
val msg = "Max ID size is 65535, because of how we pack the bytes when sending RMI messages. FATAL ERROR! (too many objects)"
|
||||
logger.error(msg)
|
||||
return INVALID_RMI
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return an ID to be used again. Reserved IDs will not be allowed to be returned
|
||||
*/
|
||||
fun returnId(id: Int) {
|
||||
idLock.write {
|
||||
if (reservedObjectIds.contains(id)) {
|
||||
logger.error("Do not return a reserved ID ($id). Once an ID is reserved, it is permanent.")
|
||||
return
|
||||
}
|
||||
|
||||
val shortCheck: Int = (id + 1)
|
||||
if (shortCheck == objectIdCounter) {
|
||||
objectIdCounter--
|
||||
|
@ -188,9 +122,6 @@ internal class RemoteObjectStorage(val logger: Logger) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Automatically registers an object with the next available ID to allow a remote connection to access this object via the returned ID
|
||||
*
|
||||
|
@ -200,7 +131,7 @@ internal class RemoteObjectStorage(val logger: Logger) {
|
|||
// this will return INVALID_RMI if there are too many in the ObjectSpace
|
||||
val nextObjectId = nextId()
|
||||
if (nextObjectId != INVALID_RMI) {
|
||||
objectMap.put(nextObjectId, `object`)
|
||||
objectMap[nextObjectId] = `object`
|
||||
|
||||
if (logger.isTraceEnabled) {
|
||||
logger.trace("Remote object <proxy:$nextObjectId> registered with .toString() = '${`object`}'")
|
||||
|
@ -220,7 +151,7 @@ internal class RemoteObjectStorage(val logger: Logger) {
|
|||
fun register(`object`: Any, objectId: Int): Boolean {
|
||||
validate(objectId)
|
||||
|
||||
objectMap.put(objectId, `object`)
|
||||
objectMap[objectId] = `object`
|
||||
|
||||
if (logger.isTraceEnabled) {
|
||||
logger.trace("Remote object <proxy:$objectId> registered with .toString() = '${`object`}'")
|
||||
|
@ -279,6 +210,36 @@ internal class RemoteObjectStorage(val logger: Logger) {
|
|||
return objectMap.inverse()[remoteObject]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return all the saved RMI implementation objects along with their RMI ID. This is so we can restore these later on
|
||||
*/
|
||||
fun getAll(): List<Pair<Int, Any>> {
|
||||
return objectMap.entries.map { it -> Pair(it.key, it.value) }.toList()
|
||||
}
|
||||
|
||||
/**
|
||||
* @return all the saved RMI implementation objects along with their RMI ID. This is so we can restore these later on
|
||||
*/
|
||||
fun restoreAll(implObjects: List<Pair<Int, Any>>) {
|
||||
idLock.write {
|
||||
// this is a bit slow, but we have to re-inject objects. THIS happens before the connection is initialized, so we know
|
||||
// these RMI ids are available
|
||||
|
||||
implObjects.forEach {
|
||||
objectMap.remove(it.first)
|
||||
}
|
||||
|
||||
objectIdCounter += implObjects.size
|
||||
}
|
||||
|
||||
|
||||
// now we have to put our items back into the backing map.
|
||||
implObjects.forEach {
|
||||
objectMap[it.first] = it.second
|
||||
}
|
||||
}
|
||||
|
||||
fun close() {
|
||||
objectMap.clear()
|
||||
}
|
||||
|
|
|
@ -224,6 +224,24 @@ internal class ResponseManager(maxValuesInCache: Int = 65534, minimumValue: Int
|
|||
return resultOrWaiter
|
||||
}
|
||||
|
||||
fun abort(responseWaiter: ResponseWaiter, logger: Logger) {
|
||||
val id = RmiUtils.unpackUnsignedRight(responseWaiter.id)
|
||||
|
||||
if (logger.isTraceEnabled) {
|
||||
logger.trace("[RM] abort: $id")
|
||||
}
|
||||
|
||||
// deletes the entry in the map
|
||||
pendingLock.write {
|
||||
pending[id] = null
|
||||
}
|
||||
|
||||
// always return the waiter to the cache
|
||||
responseWaiter.result = null
|
||||
waiterCache.put(responseWaiter)
|
||||
rmiWaitersInUse.getAndDecrement()
|
||||
}
|
||||
|
||||
fun close() {
|
||||
// technically, this isn't closing it, so much as it's cleaning it out
|
||||
if (logger.isDebugEnabled) {
|
||||
|
|
|
@ -17,11 +17,9 @@ package dorkbox.network.rmi
|
|||
|
||||
import kotlinx.atomicfu.locks.withLock
|
||||
import java.util.concurrent.*
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.locks.*
|
||||
|
||||
data class ResponseWaiter(val id: Int) {
|
||||
// @Volatile
|
||||
// private var latch = dorkbox.util.sync.CountDownLatch(1)
|
||||
|
||||
private val lock = ReentrantLock()
|
||||
private val condition = lock.newCondition()
|
||||
|
@ -35,14 +33,12 @@ data class ResponseWaiter(val id: Int) {
|
|||
*/
|
||||
fun prep() {
|
||||
result = null
|
||||
// latch = dorkbox.util.sync.CountDownLatch(1)
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until another thread invokes "doWait"
|
||||
*/
|
||||
fun doNotify() {
|
||||
// latch.countDown()
|
||||
try {
|
||||
lock.withLock {
|
||||
condition.signal()
|
||||
|
@ -55,7 +51,6 @@ data class ResponseWaiter(val id: Int) {
|
|||
* Waits a specific amount of time until another thread invokes "doNotify"
|
||||
*/
|
||||
fun doWait() {
|
||||
// latch.await()
|
||||
try {
|
||||
lock.withLock {
|
||||
condition.await()
|
||||
|
@ -68,7 +63,6 @@ data class ResponseWaiter(val id: Int) {
|
|||
* Waits a specific amount of time until another thread invokes "doNotify"
|
||||
*/
|
||||
fun doWait(timeout: Long) {
|
||||
// latch.await(timeout, TimeUnit.MILLISECONDS)
|
||||
try {
|
||||
lock.withLock {
|
||||
condition.await(timeout, TimeUnit.MILLISECONDS)
|
||||
|
|
|
@ -217,7 +217,7 @@ internal class RmiClient(val isGlobal: Boolean,
|
|||
return null
|
||||
}
|
||||
|
||||
else -> throw Exception("Invocation handler could not find RemoteObject method for ${method.name}")
|
||||
else -> throw RmiException("Invocation handler could not find RemoteObject method for ${method.name}")
|
||||
}
|
||||
} else {
|
||||
when (method) {
|
||||
|
@ -238,6 +238,8 @@ internal class RmiClient(val isGlobal: Boolean,
|
|||
}
|
||||
}
|
||||
|
||||
val connection = connection
|
||||
|
||||
// setup the RMI request
|
||||
val invokeMethod = MethodRequest()
|
||||
|
||||
|
@ -269,7 +271,12 @@ internal class RmiClient(val isGlobal: Boolean,
|
|||
// If we are async, we ignore the response (don't invoke the response manager at all)....
|
||||
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, RemoteObjectStorage.ASYNC_RMI)
|
||||
|
||||
connection.send(invokeMethod)
|
||||
val success = connection.send(invokeMethod)
|
||||
if (!success) {
|
||||
if (!connection.endPoint.sessionManager.enabled()) {
|
||||
throw RmiException("Unable to send async message, an error occurred during the send process")
|
||||
}
|
||||
}
|
||||
|
||||
// if we are async then we return immediately (but must return the correct type!)
|
||||
// If you want the response value, disable async!
|
||||
|
@ -290,17 +297,23 @@ internal class RmiClient(val isGlobal: Boolean,
|
|||
return null
|
||||
}
|
||||
|
||||
val logger = connection.logger
|
||||
|
||||
//
|
||||
// this is all SYNC code
|
||||
//
|
||||
|
||||
// The response, even if there is NOT one (ie: not void) will always return a thing (so our code execution is in lockstep -- unless it is ASYNC)
|
||||
val responseWaiter = responseManager.prep(connection.logger)
|
||||
val responseWaiter = responseManager.prep(logger)
|
||||
invokeMethod.packedId = RmiUtils.packShorts(rmiObjectId, responseWaiter.id)
|
||||
|
||||
connection.send(invokeMethod)
|
||||
|
||||
val success = connection.send(invokeMethod)
|
||||
if (!success) {
|
||||
if (!connection.endPoint.sessionManager.enabled()) {
|
||||
responseManager.abort(responseWaiter, logger)
|
||||
throw RmiException("Unable to send message, an error occurred during the send process")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// if a 'suspend' function is called, then our last argument is a 'Continuation' object
|
||||
|
@ -341,7 +354,7 @@ internal class RmiClient(val isGlobal: Boolean,
|
|||
responseWaiter.doWait()
|
||||
}
|
||||
|
||||
responseManager.getReply(responseWaiter, timeoutMillis, connection.logger, connection)
|
||||
responseManager.getReply(responseWaiter, timeoutMillis, logger, connection)
|
||||
}
|
||||
|
||||
// function suspension works differently. THIS IS A TRAMPOLINE TO CALL SUSPEND !!
|
||||
|
@ -395,7 +408,7 @@ internal class RmiClient(val isGlobal: Boolean,
|
|||
responseWaiter.doWait()
|
||||
}
|
||||
|
||||
val any = responseManager.getReply(responseWaiter, timeoutMillis, connection.logger, connection)
|
||||
val any = responseManager.getReply(responseWaiter, timeoutMillis, logger, connection)
|
||||
when (any) {
|
||||
ResponseManager.TIMEOUT_EXCEPTION -> {
|
||||
val fancyName = RmiUtils.makeFancyMethodName(method)
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright 2023 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.rmi
|
||||
|
||||
/**
|
||||
* Thrown when there is a generic RMI error (for example, if the RMI message could not be sent, or there is an action on an RMI object that is invalid
|
||||
*/
|
||||
class RmiException : Exception {
|
||||
constructor() : super() {}
|
||||
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
|
||||
constructor(message: String?) : super(message) {}
|
||||
constructor(cause: Throwable?) : super(cause) {}
|
||||
}
|
|
@ -92,7 +92,7 @@ class RmiManagerConnections<CONNECTION: Connection> internal constructor(
|
|||
|
||||
// this should be executed on a NEW coroutine!
|
||||
try {
|
||||
callback(proxyObject)
|
||||
callback(proxyObject, rmiId)
|
||||
} catch (exception: Exception) {
|
||||
exception.cleanStackTrace()
|
||||
val newException = RMIException(exception)
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.slf4j.Logger
|
|||
* The impl/proxy objects CANNOT be stored in the same data structure, because their IDs are not tied to the same ID source (and there
|
||||
* would be conflicts in the data structure)
|
||||
*/
|
||||
open class RmiObjectCache(logger: Logger) {
|
||||
open class RmiObjectCache(val logger: Logger) {
|
||||
|
||||
private val implObjects = RemoteObjectStorage(logger)
|
||||
|
||||
|
@ -64,4 +64,22 @@ open class RmiObjectCache(logger: Logger) {
|
|||
internal fun <T: Any> getId(implObject: T): Int {
|
||||
return implObjects.getId(implObject)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return all the saved RMI implementation objects along with their RMI ID. This is used by session management in order to preserve RMI functionality.
|
||||
*/
|
||||
internal fun getAllImplObjects(): List<Pair<Int, Any>> {
|
||||
return implObjects.getAll()
|
||||
}
|
||||
|
||||
/**
|
||||
* @return all the saved RMI implementation objects along with their RMI ID. This is used by session management in order to preserve RMI functionality.
|
||||
*/
|
||||
internal fun restoreAllImplObjects(implObjects: List<Pair<Int, Any>>) {
|
||||
implObjects.forEach {
|
||||
logger.error("RESTORED: ${it.first}")
|
||||
}
|
||||
this.implObjects.restoreAll(implObjects)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import dorkbox.network.rmi.messages.ConnectionObjectCreateRequest
|
|||
import dorkbox.network.rmi.messages.ConnectionObjectDeleteRequest
|
||||
import dorkbox.network.serialization.Serialization
|
||||
import org.slf4j.Logger
|
||||
import java.lang.reflect.Proxy
|
||||
|
||||
/**
|
||||
* Only the server can create or delete a global object
|
||||
|
@ -35,12 +36,9 @@ import org.slf4j.Logger
|
|||
* Connection scope objects can be remotely created or deleted by either end of the connection. Only the server can create/delete a global scope object
|
||||
*/
|
||||
class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
|
||||
|
||||
|
||||
private val logger: Logger
|
||||
private val connection: CONNECTION
|
||||
private val responseManager: ResponseManager
|
||||
private val serialization: Serialization<CONNECTION>
|
||||
val serialization: Serialization<CONNECTION>
|
||||
private val getGlobalAction: (connection: CONNECTION, objectId: Int, interfaceClass: Class<*>) -> Any
|
||||
|
||||
internal constructor(
|
||||
|
@ -50,7 +48,6 @@ class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
|
|||
serialization: Serialization<CONNECTION>,
|
||||
getGlobalAction: (connection: CONNECTION, objectId: Int, interfaceClass: Class<*>) -> Any
|
||||
) : super(logger) {
|
||||
this.logger = logger
|
||||
this.connection = connection
|
||||
this.responseManager = responseManager
|
||||
this.serialization = serialization
|
||||
|
@ -82,15 +79,48 @@ class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
|
|||
proxyObjects.put(rmiId, remoteObject)
|
||||
}
|
||||
|
||||
private fun <Iface> registerCallback(callback: Iface.() -> Unit): Int {
|
||||
private fun <Iface> registerCallback(callback: Iface.(Int) -> Unit): Int {
|
||||
return remoteObjectCreationCallbacks.register(callback)
|
||||
}
|
||||
|
||||
internal fun removeCallback(callbackId: Int): Any.() -> Unit {
|
||||
internal fun removeCallback(callbackId: Int): Any.(Int) -> Unit {
|
||||
// callback's area always correct, because we track them ourselves.
|
||||
return remoteObjectCreationCallbacks.remove(callbackId)!!
|
||||
}
|
||||
|
||||
/**
|
||||
* @return all the RMI proxy objects used by this connection. This is used by session management in order to preserve RMI functionality.
|
||||
*/
|
||||
internal fun getAllProxyObjects(): List<RemoteObject<*>> {
|
||||
return proxyObjects.values.toList()
|
||||
}
|
||||
|
||||
/**
|
||||
* Recreate all the proxy objects for this connection. This is used by session management in order to preserve RMI functionality.
|
||||
*/
|
||||
internal fun recreateProxyObjects(oldProxyObjects: List<RemoteObject<*>>) {
|
||||
oldProxyObjects.forEach {
|
||||
// the interface we care about is ALWAYS the second one!
|
||||
val iface = it.javaClass.interfaces[1]
|
||||
|
||||
val kryoId = connection.endPoint.serialization.getKryoIdForRmiClient(iface)
|
||||
val rmiClient = Proxy.getInvocationHandler(it) as RmiClient
|
||||
val rmiId = rmiClient.rmiObjectId
|
||||
|
||||
val proxyObject = RmiManagerGlobal.createProxyObject(
|
||||
rmiClient.isGlobal,
|
||||
connection,
|
||||
serialization,
|
||||
responseManager,
|
||||
kryoId, rmiId,
|
||||
iface
|
||||
)
|
||||
|
||||
logger.error("RESTORED: $rmiId")
|
||||
saveProxyObject(rmiId, proxyObject)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
@ -150,7 +180,8 @@ class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
|
|||
/**
|
||||
* Creates create a new proxy object where the implementation exists in a remote connection.
|
||||
*
|
||||
* The callback will be notified when the remote object has been created.
|
||||
* We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
|
||||
* have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
|
||||
*
|
||||
* Methods that return a value will throw [TimeoutException] if the response is not received with the response timeout [RemoteObject.responseTimeout].
|
||||
*
|
||||
|
@ -162,20 +193,26 @@ class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
|
|||
*
|
||||
* @see RemoteObject
|
||||
*/
|
||||
fun <Iface> create(vararg objectParameters: Any?, callback: Iface.() -> Unit) {
|
||||
fun <Iface> create(vararg objectParameters: Any?, callback: Iface.(rmiId: Int) -> Unit) {
|
||||
val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function1::class.java, callback.javaClass, 0) ?: callback.javaClass
|
||||
val kryoId = serialization.getKryoIdForRmiClient(iFaceClass)
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
objectParameters as Array<Any?>
|
||||
|
||||
createRemoteObject(connection, kryoId, objectParameters, callback)
|
||||
val callbackId = registerCallback(callback)
|
||||
|
||||
// There is no rmiID yet, because we haven't created it!
|
||||
val message = ConnectionObjectCreateRequest(RmiUtils.packShorts(callbackId, kryoId), objectParameters)
|
||||
|
||||
connection.send(message)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates create a new proxy object where the implementation exists in a remote connection.
|
||||
*
|
||||
* The callback will be notified when the remote object has been created.
|
||||
* We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
|
||||
* have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
|
||||
*
|
||||
* NOTE:: Methods can throw [TimeoutException] if the response is not received with the response timeout [RemoteObject.responseTimeout].
|
||||
*
|
||||
|
@ -183,15 +220,20 @@ class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
|
|||
* will have the proxy object replaced with the registered (non-proxy) object.
|
||||
*
|
||||
* If one wishes to change the default behavior, cast the object to access the different methods.
|
||||
* ie: `val remoteObject = test as RemoteObject`
|
||||
* ie: `val remoteObject = RemoteObject.cast(obj)`
|
||||
*
|
||||
* @see RemoteObject
|
||||
*/
|
||||
fun <Iface> create(callback: Iface.() -> Unit) {
|
||||
fun <Iface> create(callback: Iface.(rmiId: Int) -> Unit) {
|
||||
val iFaceClass = ClassHelper.getGenericParameterAsClassForSuperClass(Function1::class.java, callback.javaClass, 0) ?: callback.javaClass
|
||||
val kryoId = serialization.getKryoIdForRmiClient(iFaceClass)
|
||||
|
||||
createRemoteObject(connection, kryoId, null, callback)
|
||||
val callbackId = registerCallback(callback)
|
||||
|
||||
// There is no rmiID yet, because we haven't created it!
|
||||
val message = ConnectionObjectCreateRequest(RmiUtils.packShorts(callbackId, kryoId), null)
|
||||
|
||||
connection.send(message)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -310,23 +352,6 @@ class RmiSupportConnection<CONNECTION: Connection> : RmiObjectCache {
|
|||
return proxyObject as Iface
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* on the "client" to create a connection-specific remote object (that exists on the server)
|
||||
*/
|
||||
private fun <Iface> createRemoteObject(connection: CONNECTION, kryoId: Int, objectParameters: Array<Any?>?, callback: Iface.() -> Unit) {
|
||||
val callbackId = registerCallback(callback)
|
||||
|
||||
// There is no rmiID yet, because we haven't created it!
|
||||
val message = ConnectionObjectCreateRequest(RmiUtils.packShorts(callbackId, kryoId), objectParameters)
|
||||
|
||||
// We use a callback to notify us when the object is ready. We can't "create this on the fly" because we
|
||||
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
|
||||
|
||||
// this means we are creating a NEW object on the server
|
||||
connection.send(message)
|
||||
}
|
||||
|
||||
internal fun clear() {
|
||||
proxyObjects.clear()
|
||||
remoteObjectCreationCallbacks.close()
|
||||
|
|
|
@ -3,6 +3,7 @@ module dorkbox.network {
|
|||
exports dorkbox.network.aeron;
|
||||
exports dorkbox.network.connection;
|
||||
exports dorkbox.network.connection.streaming;
|
||||
exports dorkbox.network.connection.session;
|
||||
exports dorkbox.network.connectionType;
|
||||
exports dorkbox.network.exceptions;
|
||||
exports dorkbox.network.handshake;
|
||||
|
@ -11,7 +12,7 @@ module dorkbox.network {
|
|||
exports dorkbox.network.rmi;
|
||||
exports dorkbox.network.serialization;
|
||||
|
||||
requires transitive dorkbox.bytes;
|
||||
requires transitive dorkbox.byteUtils;
|
||||
requires transitive dorkbox.classUtils;
|
||||
requires transitive dorkbox.collections;
|
||||
requires transitive dorkbox.dns;
|
||||
|
@ -25,23 +26,21 @@ module dorkbox.network {
|
|||
requires transitive dorkbox.objectpool;
|
||||
requires transitive dorkbox.os;
|
||||
|
||||
// requires transitive expiringmap;
|
||||
requires transitive expiringmap;
|
||||
requires transitive com.esotericsoftware.kryo;
|
||||
requires transitive com.esotericsoftware.reflectasm;
|
||||
requires transitive org.objenesis;
|
||||
|
||||
requires transitive io.aeron.all;
|
||||
// requires io.aeron.driver;
|
||||
// requires io.aeron.client;
|
||||
// requires org.agrona.core;
|
||||
// requires transitive io.aeron.all; // only needed when debugging builds
|
||||
requires io.aeron.driver;
|
||||
requires io.aeron.client;
|
||||
requires org.agrona.core;
|
||||
|
||||
requires transitive org.slf4j;
|
||||
requires transitive io.github.microutils.kotlinlogging;
|
||||
|
||||
requires transitive kotlinx.atomicfu;
|
||||
|
||||
requires kotlin.stdlib;
|
||||
requires kotlin.stdlib.jdk8;
|
||||
requires kotlinx.coroutines.core;
|
||||
|
||||
// requires kotlinx.coroutines.core.jvm;
|
Loading…
Reference in New Issue