connections are now created via a unit function instead of overriding the method.

This commit is contained in:
Robinson 2021-07-09 15:16:12 +02:00
parent c725806727
commit 388e4bda60
5 changed files with 53 additions and 35 deletions

View File

@ -38,13 +38,24 @@ import java.net.InetAddress
/** /**
* The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
* ASYNC. * ASYNC.
*
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
*/ */
open class Client<CONNECTION : Connection>(config: Configuration = Configuration()) : EndPoint<CONNECTION>(config) { open class Client<CONNECTION : Connection>(
config: Configuration = Configuration(),
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION = {
@Suppress("UNCHECKED_CAST")
Connection(it) as CONNECTION
})
: EndPoint<CONNECTION>(config, connectionFunc) {
companion object { companion object {
/** /**
* Gets the version number. * Gets the version number.
*/ */
const val version = "5.2" const val version = "5.3"
/** /**
* Checks to see if a client (using the specified configuration) is running. * Checks to see if a client (using the specified configuration) is running.
@ -437,9 +448,9 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
val newConnection: CONNECTION val newConnection: CONNECTION
if (isUsingIPC) { if (isUsingIPC) {
newConnection = newConnection(ConnectionParams(this, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport)) newConnection = connectionFunc(ConnectionParams(this, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport))
} else { } else {
newConnection = newConnection(ConnectionParams(this, clientConnection, validateRemoteAddress, rmiConnectionSupport)) newConnection = connectionFunc(ConnectionParams(this, clientConnection, validateRemoteAddress, rmiConnectionSupport))
remoteAddress!! remoteAddress!!
// VALIDATE are we allowed to connect to this server (now that we have the initial server information) // VALIDATE are we allowed to connect to this server (now that we have the initial server information)
@ -604,7 +615,6 @@ open class Client<CONNECTION : Connection>(config: Configuration = Configuration
return if (c != null) { return if (c != null) {
c.send(message) c.send(message)
true
} else { } else {
val exception = ClientException("Cannot send a message when there is no connection!") val exception = ClientException("Cannot send a message when there is no connection!")
logger.error("No connection!", exception) logger.error("No connection!", exception)

View File

@ -38,7 +38,7 @@ class ServerConfiguration : dorkbox.network.Configuration() {
/** /**
* Gets the version number. * Gets the version number.
*/ */
const val version = "5.2" const val version = "5.3"
} }
/** /**
@ -279,9 +279,9 @@ open class Configuration {
} }
/** /**
* Specify the serialization manager to use. * Specify the serialization manager to use. The type must extend `Connection`, since this will be cast
*/ */
var serialization: Serialization<Connection> = Serialization() var serialization: Serialization<*> = Serialization<Connection>()
set(value) { set(value) {
require(!contextDefined) { errorMessage } require(!contextDefined) { errorMessage }
field = value field = value

View File

@ -21,6 +21,7 @@ import dorkbox.network.aeron.AeronPoller
import dorkbox.network.aeron.IpcMediaDriverConnection import dorkbox.network.aeron.IpcMediaDriverConnection
import dorkbox.network.aeron.UdpMediaDriverServerConnection import dorkbox.network.aeron.UdpMediaDriverServerConnection
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.network.connection.ConnectionParams
import dorkbox.network.connection.EndPoint import dorkbox.network.connection.EndPoint
import dorkbox.network.connection.eventLoop import dorkbox.network.connection.eventLoop
import dorkbox.network.connectionType.ConnectionRule import dorkbox.network.connectionType.ConnectionRule
@ -44,15 +45,24 @@ import java.util.concurrent.TimeUnit
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
* server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections()) * server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections())
* *
*
* To put it bluntly, ONLY have the server do work inside of a listener! * To put it bluntly, ONLY have the server do work inside of a listener!
*
* @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
*/ */
open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerConfiguration()) : EndPoint<CONNECTION>(config) { open class Server<CONNECTION : Connection>(
config: ServerConfiguration = ServerConfiguration(),
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION = {
@Suppress("UNCHECKED_CAST")
Connection(it) as CONNECTION
})
: EndPoint<CONNECTION>(config, connectionFunc) {
companion object { companion object {
/** /**
* Gets the version number. * Gets the version number.
*/ */
const val version = "5.2" const val version = "5.3"
/** /**
* Checks to see if a server (using the specified configuration) is running. * Checks to see if a server (using the specified configuration) is running.
@ -184,6 +194,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
sessionId, sessionId,
message, message,
aeronDriver, aeronDriver,
connectionFunc,
logger) logger)
} }
@ -271,6 +282,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
message, message,
aeronDriver, aeronDriver,
false, false,
connectionFunc,
logger) logger)
} }
@ -358,6 +370,7 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
message, message,
aeronDriver, aeronDriver,
false, false,
connectionFunc,
logger) logger)
} }
@ -445,7 +458,8 @@ open class Server<CONNECTION : Connection>(config: ServerConfiguration = ServerC
message, message,
aeronDriver, aeronDriver,
true, true,
logger) connectionFunc,
logger)
} }
override fun poll(): Int { return subscription.poll(handler, 1) } override fun poll(): Int { return subscription.poll(handler, 1) }

View File

@ -65,13 +65,20 @@ fun CoroutineScope.eventLoop(block: suspend CoroutineScope.() -> Unit): Job {
* *
* @param type this is either "Client" or "Server", depending on who is creating this endpoint. * @param type this is either "Client" or "Server", depending on who is creating this endpoint.
* @param config these are the specific connection options * @param config these are the specific connection options
* @param connectionFunc allows for custom connection implementations defined as a unit function
* *
* @throws SecurityException if unable to initialize/generate ECC keys * @throws SecurityException if unable to initialize/generate ECC keys
*/ */
abstract class EndPoint<CONNECTION : Connection> abstract class EndPoint<CONNECTION : Connection>
internal constructor(val type: Class<*>, internal val config: Configuration) : AutoCloseable { internal constructor(val type: Class<*>,
protected constructor(config: Configuration) : this(Client::class.java, config) internal val config: Configuration,
protected constructor(config: ServerConfiguration) : this(Server::class.java, config) @Suppress("UNCHECKED_CAST")
internal val connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION)
: AutoCloseable {
@Suppress("UNCHECKED_CAST")
protected constructor(config: Configuration, connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION) : this(Client::class.java, config, connectionFunc)
protected constructor(config: ServerConfiguration, connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION) : this(Server::class.java, config, connectionFunc)
val logger: KLogger = KotlinLogging.logger(type.simpleName) val logger: KLogger = KotlinLogging.logger(type.simpleName)
@ -149,13 +156,13 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
if (type.javaClass == Server::class.java) { if (type.javaClass == Server::class.java) {
// server cannot "get" global RMI objects, only the client can // server cannot "get" global RMI objects, only the client can
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization<CONNECTION>) rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, serialization)
{ _, _, _ -> { _, _, _ ->
throw IllegalAccessException("Global RMI access is only possible from a Client connection!") throw IllegalAccessException("Global RMI access is only possible from a Client connection!")
} }
} else { } else {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, config.serialization as Serialization<CONNECTION>) rmiConnectionSupport = RmiManagerConnections(logger, responseManager, listenerManager, serialization)
{ connection, objectId, interfaceClass -> { connection, objectId, interfaceClass ->
return@RmiManagerConnections rmiGlobalSupport.getGlobalRemoteObject(connection, objectId, interfaceClass) return@RmiManagerConnections rmiGlobalSupport.getGlobalRemoteObject(connection, objectId, interfaceClass)
} }
@ -207,21 +214,6 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
connections.remove(connection) connections.remove(connection)
} }
/**
* This method allows the connections used by the client/server to be subclassed (with custom implementations).
*
* As this is for the network stack, the new connection MUST subclass [Connection]
*
* The parameters are ALL NULL when getting the base class, as this instance is just thrown away.
*
* @return a new network connection
*/
@Suppress("MemberVisibilityCanBePrivate")
open fun newConnection(connectionParameters: ConnectionParams<CONNECTION>): CONNECTION {
@Suppress("UNCHECKED_CAST")
return Connection(connectionParameters) as CONNECTION
}
/** /**
* Adds an IP+subnet rule that defines if that IP+subnet is allowed/denied connectivity to this server. * Adds an IP+subnet rule that defines if that IP+subnet is allowed/denied connectivity to this server.
* *

View File

@ -195,6 +195,7 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
sessionId: Int, sessionId: Int,
message: HandshakeMessage, message: HandshakeMessage,
aeronDriver: AeronDriver, aeronDriver: AeronDriver,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
logger: KLogger logger: KLogger
) { ) {
@ -278,7 +279,7 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
"[${clientConnection.sessionId}] IPC connection established to [${clientConnection.streamIdSubscription}|${clientConnection.streamId}]" "[${clientConnection.sessionId}] IPC connection established to [${clientConnection.streamIdSubscription}|${clientConnection.streamId}]"
} }
val connection = server.newConnection(ConnectionParams(server, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport)) val connection = connectionFunc(ConnectionParams(server, clientConnection, PublicKeyValidationState.VALID, rmiConnectionSupport))
// VALIDATE:: are we allowed to connect to this server (now that we have the initial server information) // VALIDATE:: are we allowed to connect to this server (now that we have the initial server information)
// NOTE: all IPC client connections are, by default, always allowed to connect, because they are running on the same machine // NOTE: all IPC client connections are, by default, always allowed to connect, because they are running on the same machine
@ -335,6 +336,7 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
message: HandshakeMessage, message: HandshakeMessage,
aeronDriver: AeronDriver, aeronDriver: AeronDriver,
isIpv6Wildcard: Boolean, isIpv6Wildcard: Boolean,
connectionFunc: (connectionParameters: ConnectionParams<CONNECTION>) -> CONNECTION,
logger: KLogger) { logger: KLogger) {
if (!validateMessageTypeAndDoPending( if (!validateMessageTypeAndDoPending(
@ -445,7 +447,7 @@ internal class ServerHandshake<CONNECTION : Connection>(logger: KLogger,
"Creating new connection from $clientAddressString [$subscriptionPort|$publicationPort] [$connectionStreamId|$connectionSessionId] (reliable:${message.isReliable})" "Creating new connection from $clientAddressString [$subscriptionPort|$publicationPort] [$connectionStreamId|$connectionSessionId] (reliable:${message.isReliable})"
} }
val connection = server.newConnection(ConnectionParams(server, clientConnection, validateRemoteAddress, rmiConnectionSupport)) val connection = connectionFunc(ConnectionParams(server, clientConnection, validateRemoteAddress, rmiConnectionSupport))
// VALIDATE:: are we allowed to connect to this server (now that we have the initial server information) // VALIDATE:: are we allowed to connect to this server (now that we have the initial server information)
val permitConnection = listenerManager.notifyFilter(connection) val permitConnection = listenerManager.notifyFilter(connection)