From 388e4bda609255ff2864ca0eae0845ca41a31b5b Mon Sep 17 00:00:00 2001 From: Robinson Date: Fri, 9 Jul 2021 15:16:12 +0200 Subject: [PATCH] connections are now created via a unit function instead of overriding the method. --- src/dorkbox/network/Client.kt | 20 ++++++++--- src/dorkbox/network/Configuration.kt | 6 ++-- src/dorkbox/network/Server.kt | 22 +++++++++--- src/dorkbox/network/connection/EndPoint.kt | 34 +++++++------------ .../network/handshake/ServerHandshake.kt | 6 ++-- 5 files changed, 53 insertions(+), 35 deletions(-) diff --git a/src/dorkbox/network/Client.kt b/src/dorkbox/network/Client.kt index bee2a092..5ad5fec0 100644 --- a/src/dorkbox/network/Client.kt +++ b/src/dorkbox/network/Client.kt @@ -38,13 +38,24 @@ import java.net.InetAddress /** * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's * ASYNC. + * + * @param config these are the specific connection options + * @param connectionFunc allows for custom connection implementations defined as a unit function */ -open class Client(config: Configuration = Configuration()) : EndPoint(config) { +open class Client( + config: Configuration = Configuration(), + connectionFunc: (connectionParameters: ConnectionParams) -> CONNECTION = { + @Suppress("UNCHECKED_CAST") + Connection(it) as CONNECTION + }) + : EndPoint(config, connectionFunc) { + + companion object { /** * Gets the version number. */ - const val version = "5.2" + const val version = "5.3" /** * Checks to see if a client (using the specified configuration) is running. @@ -437,9 +448,9 @@ open class Client(config: Configuration = Configuration val newConnection: CONNECTION if (isUsingIPC) { - newConnection = newConnection(ConnectionParams(this, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport)) + newConnection = connectionFunc(ConnectionParams(this, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport)) } else { - newConnection = newConnection(ConnectionParams(this, clientConnection, validateRemoteAddress, rmiConnectionSupport)) + newConnection = connectionFunc(ConnectionParams(this, clientConnection, validateRemoteAddress, rmiConnectionSupport)) remoteAddress!! // VALIDATE are we allowed to connect to this server (now that we have the initial server information) @@ -604,7 +615,6 @@ open class Client(config: Configuration = Configuration return if (c != null) { c.send(message) - true } else { val exception = ClientException("Cannot send a message when there is no connection!") logger.error("No connection!", exception) diff --git a/src/dorkbox/network/Configuration.kt b/src/dorkbox/network/Configuration.kt index 9bb89cd6..0c2574af 100644 --- a/src/dorkbox/network/Configuration.kt +++ b/src/dorkbox/network/Configuration.kt @@ -38,7 +38,7 @@ class ServerConfiguration : dorkbox.network.Configuration() { /** * Gets the version number. */ - const val version = "5.2" + const val version = "5.3" } /** @@ -279,9 +279,9 @@ open class Configuration { } /** - * Specify the serialization manager to use. + * Specify the serialization manager to use. The type must extend `Connection`, since this will be cast */ - var serialization: Serialization = Serialization() + var serialization: Serialization<*> = Serialization() set(value) { require(!contextDefined) { errorMessage } field = value diff --git a/src/dorkbox/network/Server.kt b/src/dorkbox/network/Server.kt index 7c3b4f15..27ee8961 100644 --- a/src/dorkbox/network/Server.kt +++ b/src/dorkbox/network/Server.kt @@ -21,6 +21,7 @@ import dorkbox.network.aeron.AeronPoller import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.UdpMediaDriverServerConnection import dorkbox.network.connection.Connection +import dorkbox.network.connection.ConnectionParams import dorkbox.network.connection.EndPoint import dorkbox.network.connection.eventLoop import dorkbox.network.connectionType.ConnectionRule @@ -44,15 +45,24 @@ import java.util.concurrent.TimeUnit * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the * server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections()) * - * * To put it bluntly, ONLY have the server do work inside of a listener! + * + * @param config these are the specific connection options + * @param connectionFunc allows for custom connection implementations defined as a unit function */ -open class Server(config: ServerConfiguration = ServerConfiguration()) : EndPoint(config) { +open class Server( + config: ServerConfiguration = ServerConfiguration(), + connectionFunc: (connectionParameters: ConnectionParams) -> CONNECTION = { + @Suppress("UNCHECKED_CAST") + Connection(it) as CONNECTION + }) + : EndPoint(config, connectionFunc) { + companion object { /** * Gets the version number. */ - const val version = "5.2" + const val version = "5.3" /** * Checks to see if a server (using the specified configuration) is running. @@ -184,6 +194,7 @@ open class Server(config: ServerConfiguration = ServerC sessionId, message, aeronDriver, + connectionFunc, logger) } @@ -271,6 +282,7 @@ open class Server(config: ServerConfiguration = ServerC message, aeronDriver, false, + connectionFunc, logger) } @@ -358,6 +370,7 @@ open class Server(config: ServerConfiguration = ServerC message, aeronDriver, false, + connectionFunc, logger) } @@ -445,7 +458,8 @@ open class Server(config: ServerConfiguration = ServerC message, aeronDriver, true, - logger) + connectionFunc, + logger) } override fun poll(): Int { return subscription.poll(handler, 1) } diff --git a/src/dorkbox/network/connection/EndPoint.kt b/src/dorkbox/network/connection/EndPoint.kt index f5e1a4e8..bb59c215 100644 --- a/src/dorkbox/network/connection/EndPoint.kt +++ b/src/dorkbox/network/connection/EndPoint.kt @@ -65,13 +65,20 @@ fun CoroutineScope.eventLoop(block: suspend CoroutineScope.() -> Unit): Job { * * @param type this is either "Client" or "Server", depending on who is creating this endpoint. * @param config these are the specific connection options + * @param connectionFunc allows for custom connection implementations defined as a unit function * - * @throws SecurityException if unable to initialize/generate ECC keys + * @throws SecurityException if unable to initialize/generate ECC keys */ abstract class EndPoint -internal constructor(val type: Class<*>, internal val config: Configuration) : AutoCloseable { - protected constructor(config: Configuration) : this(Client::class.java, config) - protected constructor(config: ServerConfiguration) : this(Server::class.java, config) +internal constructor(val type: Class<*>, + internal val config: Configuration, + @Suppress("UNCHECKED_CAST") + internal val connectionFunc: (connectionParameters: ConnectionParams) -> CONNECTION) + : AutoCloseable { + + @Suppress("UNCHECKED_CAST") + protected constructor(config: Configuration, connectionFunc: (connectionParameters: ConnectionParams) -> CONNECTION) : this(Client::class.java, config, connectionFunc) + protected constructor(config: ServerConfiguration, connectionFunc: (connectionParameters: ConnectionParams) -> CONNECTION) : this(Server::class.java, config, connectionFunc) val logger: KLogger = KotlinLogging.logger(type.simpleName) @@ -149,13 +156,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A if (type.javaClass == Server::class.java) { // server cannot "get" global RMI objects, only the client can @Suppress("UNCHECKED_CAST") - rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization) + rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, serialization) { _, _, _ -> throw IllegalAccessException("Global RMI access is only possible from a Client connection!") } } else { @Suppress("UNCHECKED_CAST") - rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization) + rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, serialization) { connection, objectId, interfaceClass -> return@RmiManagerConnections rmiGlobalSupport.getGlobalRemoteObject(connection, objectId, interfaceClass) } @@ -207,21 +214,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A connections.remove(connection) } - /** - * This method allows the connections used by the client/server to be subclassed (with custom implementations). - * - * As this is for the network stack, the new connection MUST subclass [Connection] - * - * The parameters are ALL NULL when getting the base class, as this instance is just thrown away. - * - * @return a new network connection - */ - @Suppress("MemberVisibilityCanBePrivate") - open fun newConnection(connectionParameters: ConnectionParams): CONNECTION { - @Suppress("UNCHECKED_CAST") - return Connection(connectionParameters) as CONNECTION - } - /** * Adds an IP+subnet rule that defines if that IP+subnet is allowed/denied connectivity to this server. * diff --git a/src/dorkbox/network/handshake/ServerHandshake.kt b/src/dorkbox/network/handshake/ServerHandshake.kt index 297a91d9..300694c3 100644 --- a/src/dorkbox/network/handshake/ServerHandshake.kt +++ b/src/dorkbox/network/handshake/ServerHandshake.kt @@ -195,6 +195,7 @@ internal class ServerHandshake(logger: KLogger, sessionId: Int, message: HandshakeMessage, aeronDriver: AeronDriver, + connectionFunc: (connectionParameters: ConnectionParams) -> CONNECTION, logger: KLogger ) { @@ -278,7 +279,7 @@ internal class ServerHandshake(logger: KLogger, "[${clientConnection.sessionId}] IPC connection established to [${clientConnection.streamIdSubscription}|${clientConnection.streamId}]" } - val connection = server.newConnection(ConnectionParams(server, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport)) + val connection = connectionFunc(ConnectionParams(server, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport)) // VALIDATE:: are we allowed to connect to this server (now that we have the initial server information) // NOTE: all IPC client connections are, by default, always allowed to connect, because they are running on the same machine @@ -335,6 +336,7 @@ internal class ServerHandshake(logger: KLogger, message: HandshakeMessage, aeronDriver: AeronDriver, isIpv6Wildcard: Boolean, + connectionFunc: (connectionParameters: ConnectionParams) -> CONNECTION, logger: KLogger) { if (!validateMessageTypeAndDoPending( @@ -445,7 +447,7 @@ internal class ServerHandshake(logger: KLogger, "Creating new connection from $clientAddressString [$subscriptionPort|$publicationPort] [$connectionStreamId|$connectionSessionId] (reliable:${message.isReliable})" } - val connection = server.newConnection(ConnectionParams(server, clientConnection, validateRemoteAddress, rmiConnectionSupport)) + val connection = connectionFunc(ConnectionParams(server, clientConnection, validateRemoteAddress, rmiConnectionSupport)) // VALIDATE:: are we allowed to connect to this server (now that we have the initial server information) val permitConnection = listenerManager.notifyFilter(connection)