Network/test/dorkboxTest/network/app/AeronRmiClientServer.kt

380 lines
13 KiB
Kotlin
Raw Normal View History

2023-03-01 00:22:36 +01:00
/*
* Copyright 2023 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
2023-07-21 00:20:23 +02:00
package dorkboxTest.network.app
2023-03-01 00:22:36 +01:00
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.encoder.PatternLayoutEncoder
import ch.qos.logback.classic.joran.JoranConfigurator
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.ConsoleAppender
2023-07-21 00:20:23 +02:00
import dorkbox.network.*
import dorkbox.network.aeron.CoroutineNoOpIdleStrategy
2023-03-01 00:22:36 +01:00
import dorkbox.network.connection.Connection
import dorkbox.storage.Storage
2023-07-21 00:20:23 +02:00
import dorkbox.util.Sys
2023-03-01 00:22:36 +01:00
import dorkboxTest.network.rmi.cows.TestCow
import dorkboxTest.network.rmi.cows.TestCowImpl
2023-07-21 00:20:23 +02:00
import io.aeron.driver.ThreadingMode
2023-07-11 00:12:09 +02:00
import kotlinx.coroutines.*
2023-07-21 00:20:23 +02:00
import org.agrona.ExpandableDirectByteBuffer
2023-07-24 02:02:28 +02:00
import org.agrona.concurrent.NoOpIdleStrategy
2023-07-11 00:12:09 +02:00
import org.agrona.concurrent.SigInt
2023-03-01 00:22:36 +01:00
import org.slf4j.LoggerFactory
import sun.misc.Unsafe
import java.lang.reflect.Field
2023-07-21 00:20:23 +02:00
import java.security.SecureRandom
import java.util.concurrent.*
2023-03-01 00:22:36 +01:00
import java.util.concurrent.atomic.*
/**
*
*/
class AeronRmiClientServer {
companion object {
2023-06-25 17:25:29 +02:00
private val counter = AtomicInteger(0)
2023-03-01 00:22:36 +01:00
init {
try {
val theUnsafe = Unsafe::class.java.getDeclaredField("theUnsafe")
theUnsafe.isAccessible = true
val u = theUnsafe.get(null) as Unsafe
val cls = Class.forName("jdk.internal.module.IllegalAccessLogger")
val logger: Field = cls.getDeclaredField("logger")
u.putObjectVolatile(cls, u.staticFieldOffset(logger), null)
} catch (e: NoSuchFieldException) {
e.printStackTrace()
} catch (e: IllegalAccessException) {
e.printStackTrace()
} catch (e: ClassNotFoundException) {
e.printStackTrace()
}
// assume SLF4J is bound to logback in the current environment
val rootLogger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger
val context = rootLogger.loggerContext
val jc = JoranConfigurator()
jc.context = context
context.reset() // override default configuration
// rootLogger.setLevel(Level.OFF);
// rootLogger.setLevel(Level.INFO);
// rootLogger.level = Level.DEBUG
2023-07-21 00:20:23 +02:00
// rootLogger.level = Level.TRACE
2023-03-01 00:22:36 +01:00
// rootLogger.setLevel(Level.ALL);
// we only want error messages
val nettyLogger = LoggerFactory.getLogger("io.netty") as Logger
nettyLogger.level = Level.ERROR
// we only want error messages
val kryoLogger = LoggerFactory.getLogger("com.esotericsoftware") as Logger
kryoLogger.level = Level.ERROR
val encoder = PatternLayoutEncoder()
encoder.context = context
encoder.pattern = "%date{HH:mm:ss.SSS} [%t] %-5level [%logger{35}] %msg%n"
encoder.start()
val consoleAppender = ConsoleAppender<ILoggingEvent>()
consoleAppender.context = context
consoleAppender.encoder = encoder
consoleAppender.start()
rootLogger.addAppender(consoleAppender)
}
/**
* Command-line entry point.
*
* @param args Command-line arguments
*
* @throws Exception On any error
*/
@Throws(Exception::class)
@JvmStatic
2023-07-21 00:20:23 +02:00
fun main(cliArgs: Array<String>) {
2023-03-01 00:22:36 +01:00
2023-08-05 07:32:16 +02:00
val config = Config()
2023-03-01 00:22:36 +01:00
2023-08-05 07:32:16 +02:00
val configProcessor = dorkbox.config.ConfigProcessor(config)
.envPrefix("")
.cliArguments(cliArgs)
.process()
2023-03-01 00:22:36 +01:00
2023-07-21 00:20:23 +02:00
val acs = AeronRmiClientServer()
try {
if (config.server) {
val server = acs.server()
2023-07-11 00:12:09 +02:00
2023-07-21 00:20:23 +02:00
server.onMessage<ByteArray> {
logger.error { "Received Byte array!" }
2023-07-11 00:12:09 +02:00
}
2023-03-17 15:05:59 +01:00
runBlocking {
server.waitForClose()
}
2023-07-21 00:20:23 +02:00
}
else if (config.client) {
val client = acs.client(0)
2023-03-01 00:22:36 +01:00
2023-07-21 00:20:23 +02:00
client.connect(config.ip, 2000, 2001, 0) // UDP connection via loopback
runBlocking {
val secureRandom = SecureRandom()
val sizeToTest = ExpandableDirectByteBuffer.MAX_BUFFER_LENGTH / 32
// don't want to time allocating the mem, just time "serializing and sending"
val hugeData = ByteArray(sizeToTest)
secureRandom.nextBytes(hugeData) // REALLY slow!!!
var count = 0
var timed = 0L
client.logger.error { "Initializing test." }
// just to start it up.
repeat(5) {
client.send(hugeData)
}
2023-07-21 00:20:23 +02:00
client.logger.error { "Starting test." }
val allStopwatch = Stopwatch.createStarted()
while(TimeUnit.NANOSECONDS.toSeconds(timed) < 5) {
client.logger.error { "Starting round: $count" }
2023-07-21 00:20:23 +02:00
val roundStopwatch = Stopwatch.createStarted()
client.send(hugeData)
timed += roundStopwatch.elapsedNanos()
client.logger.error { "Finished round $count in: $roundStopwatch" }
count++
}
2023-07-21 00:20:23 +02:00
val amountInMB = (count.toLong()*sizeToTest)/Sys.MEGABYTE
val amountInmb = (count.toLong()*sizeToTest*8)/Sys.MEGABYTE
val fullElapsed = allStopwatch.elapsedNanos()
2023-07-21 00:20:23 +02:00
client.logger.error { "Finished $count rounds in: ${Sys.getTimePrettyFull(fullElapsed)}" }
client.logger.error { "Sending data portion took: ${Sys.getTimePrettyFull(timed)} for $amountInMB MB" }
2023-07-21 00:20:23 +02:00
val timedInSeconds = TimeUnit.NANOSECONDS.toSeconds(timed)
client.logger.error { "Rate is: ${amountInMB/timedInSeconds} MB/s" }
client.logger.error { "Rate is: ${amountInmb/timedInSeconds} mb/s" }
2023-03-01 00:22:36 +01:00
}
}
} catch (e: Exception) {
e.printStackTrace()
println("WHOOPS")
}
}
}
2023-05-28 18:41:46 +02:00
@OptIn(DelicateCoroutinesApi::class)
2023-07-21 00:20:23 +02:00
fun client(index: Int): Client<Connection> {
val configuration = ClientConfiguration()
config(configuration)
2023-03-01 00:22:36 +01:00
val client = Client<Connection>(configuration)
client.onInit {
logger.error("$index: initialized")
}
client.onConnect {
logger.error("$index: connected")
val remoteObject = rmi.get<TestCow>(1)
// val remoteObjec2 = rmi.getGlobal<TestCow>(44)
2023-07-21 00:20:23 +02:00
// if (index == 9) {
// println("PROBLEMS!!")
// }
2023-07-11 00:12:09 +02:00
// logger.error("$index: starting dispatch")
// try {
// GlobalScope.async(Dispatchers.Default) {
// var startTime = System.nanoTime()
// logger.error("$index: started dispatch")
//
// var previousCount = 0
// while (true) {
// val counter = counter.getAndIncrement()
// try {
// // ping()
// // RemoteObject.cast(remoteObject).async {
// val value = "$index"
// val mooTwoValue = remoteObject.mooTwo(value)
// if (mooTwoValue != "moo-two: $value") {
// throw Exception("Value not the same!")
// }
// // remoteObject.mooTwo("count $counter")
// // }
// } catch (e: Exception) {
// e.printStackTrace()
// logger.error { "$index: ERROR with client " }
// return@async
// }
//
// val elapsedTime = ( System.nanoTime() - startTime) / 1_000_000_000.0
// if (index == 0 && elapsedTime > 1.0) {
// logger.error {
// val perSecond = ((counter - previousCount) / elapsedTime).toInt()
// "Count: $perSecond/sec" }
// startTime = System.nanoTime()
// previousCount = counter
// }
// }
// }
// } catch (e: Exception) {
// e.printStackTrace()
// }
2023-03-01 00:22:36 +01:00
}
client.onDisconnect {
logger.error("disconnect")
}
client.onError { throwable ->
logger.error("has error")
throwable.printStackTrace()
}
client.onMessage<String> { message ->
logger.error("HAS MESSAGE! $message")
}
2023-07-21 00:20:23 +02:00
SigInt.register {
client.logger.info { "Shutting down via sig-int command" }
runBlocking {
client.close(closeEverything = true, initiatedByClientClose = false, initiatedByShutdown = false)
}
}
2023-03-01 00:22:36 +01:00
return client
}
2023-07-21 00:20:23 +02:00
fun config(config: Configuration) {
config.settingsStore = Storage.Memory() // don't want to persist anything on disk!
2023-03-01 00:22:36 +01:00
2023-07-21 00:20:23 +02:00
config.appId = "aeron_test"
config.enableIPv6 = false
2023-03-01 00:22:36 +01:00
2023-07-24 02:02:28 +02:00
config.enableIpc = true
// config.enableIpc = false
// config.uniqueAeronDirectory = true
config.forceAllowSharedAeronDriver = true
2023-07-11 00:12:09 +02:00
2023-07-21 00:20:23 +02:00
// dedicate more **OOMPF** to the network
config.threadingMode = ThreadingMode.SHARED_NETWORK
// config.threadingMode = ThreadingMode.DEDICATED
config.pollIdleStrategy = CoroutineNoOpIdleStrategy.INSTANCE
config.sendIdleStrategy = CoroutineNoOpIdleStrategy.INSTANCE
// only if there are enough threads on the box!
2023-07-24 02:03:04 +02:00
if (Runtime.getRuntime().availableProcessors() >= 4) {
2023-07-24 02:02:28 +02:00
// config.conductorIdleStrategy = BusySpinIdleStrategy.INSTANCE
// config.sharedIdleStrategy = NoOpIdleStrategy.INSTANCE
config.receiverIdleStrategy = NoOpIdleStrategy.INSTANCE
config.senderIdleStrategy = NoOpIdleStrategy.INSTANCE
}
2023-03-01 00:22:36 +01:00
2023-07-21 00:20:23 +02:00
// https://blah.cloud/networks/test-jumbo-frames-working/
// This must be a multiple of 32, and we leave some space for headers/etc
2023-07-23 13:40:51 +02:00
config.networkMtuSize = 8192
config.ipcMtuSize = io.aeron.driver.Configuration.MAX_UDP_PAYLOAD_LENGTH
2023-03-01 00:22:36 +01:00
2023-07-21 00:20:23 +02:00
// 4 MB for receive
config.receiveBufferSize = 4194304
2023-03-01 00:22:36 +01:00
2023-07-21 00:20:23 +02:00
// recommended for 10gbps networks
config.initialWindowLength = io.aeron.driver.Configuration.INITIAL_WINDOW_LENGTH_DEFAULT
config.maxStreamSizeInMemoryMB = 128
}
fun server(): Server<Connection> {
val configuration = ServerConfiguration()
config(configuration)
configuration.listenIpAddress = "*"
configuration.maxClientCount = 50
configuration.maxConnectionsPerIpAddress = 50
2023-03-01 00:22:36 +01:00
configuration.serialization.rmi.register(TestCow::class.java, TestCowImpl::class.java)
val server = Server<Connection>(configuration)
// server.rmiGlobal.save(TestCowImpl(44), 44)
// we must always make sure that aeron is shut-down before starting again.
2023-05-28 18:41:46 +02:00
runBlocking {
if (!server.ensureStopped()) {
throw IllegalStateException("Aeron was unable to shut down in a timely manner.")
}
2023-03-01 00:22:36 +01:00
}
server.onInit {
logger.error("initialized")
rmi.save(TestCowImpl(1), 1)
}
server.onConnect {
logger.error("connected: $this")
}
server.onDisconnect {
logger.error("disconnect: $this")
}
server.onErrorGlobal { throwable ->
server.logger.error("from test: has error")
throwable.printStackTrace()
}
server.onError { throwable ->
logger.error("from test: has connection error: $this")
throwable.printStackTrace()
}
server.bind(2000)
2023-07-21 00:20:23 +02:00
SigInt.register {
server.logger.info { "Shutting down via sig-int command" }
runBlocking {
server.close(closeEverything = true, initiatedByClientClose = false, initiatedByShutdown = false)
}
}
2023-03-01 00:22:36 +01:00
return server
}
}
2023-07-21 00:20:23 +02:00