Fixed issues with Client/Server worker thread pool sizes.

This commit is contained in:
nathan 2019-01-17 16:15:26 +01:00
parent e6d8666862
commit aa4fc5c44b
4 changed files with 114 additions and 94 deletions

View File

@ -37,6 +37,7 @@ import dorkbox.util.exceptions.SecurityException;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollDatagramChannel;
@ -108,17 +109,16 @@ class Client<C extends Connection> extends EndPointClient implements Connection
localChannelName = config.localChannelName;
hostName = config.host;
if (config.localChannelName != null && config.tcpPort <= 0 && config.udpPort <= 0) {
// no networked bootstraps. LOCAL connection only
Bootstrap localBootstrap = new Bootstrap();
bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap));
Bootstrap localBootstrap = null;
Bootstrap tcpBootstrap = null;
Bootstrap udpBootstrap = null;
localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"))
.channel(LocalChannel.class)
.remoteAddress(new LocalAddress(config.localChannelName))
.handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper));
if (config.localChannelName != null) {
localBootstrap = new Bootstrap();
}
else {
if (config.tcpPort > 0 || config.udpPort > 0) {
if (config.host == null) {
throw new IllegalArgumentException("You must define what host you want to connect to.");
}
@ -126,85 +126,108 @@ class Client<C extends Connection> extends EndPointClient implements Connection
if (config.host.equals("0.0.0.0")) {
throw new IllegalArgumentException("You cannot connect to 0.0.0.0, you must define what host you want to connect to.");
}
}
if (config.tcpPort <= 0 && config.udpPort <= 0) {
throw new IllegalArgumentException("You must define what port you want to connect to.");
if (config.tcpPort > 0) {
tcpBootstrap = new Bootstrap();
}
if (config.udpPort > 0) {
udpBootstrap = new Bootstrap();
}
if (localBootstrap == null && tcpBootstrap == null && udpBootstrap == null) {
throw new IllegalArgumentException("You must define how you want to connect, either LOCAL channel name, TCP port, or UDP port");
}
if (config.localChannelName != null) {
// no networked bootstraps. LOCAL connection only
bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap));
localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"))
.channel(LocalChannel.class)
.remoteAddress(new LocalAddress(config.localChannelName))
.handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper));
}
EventLoopGroup workerEventLoop = null;
if (tcpBootstrap != null || udpBootstrap != null) {
workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName);
}
if (tcpBootstrap != null) {
bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap));
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
tcpBootstrap.channel(OioSocketChannel.class);
}
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
// epoll network stack is MUCH faster (but only on linux)
tcpBootstrap.channel(EpollSocketChannel.class);
}
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
// KQueue network stack is MUCH faster (but only on macosx)
tcpBootstrap.channel(KQueueSocketChannel.class);
}
else {
tcpBootstrap.channel(NioSocketChannel.class);
}
if (config.tcpPort > 0) {
Bootstrap tcpBootstrap = new Bootstrap();
bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap));
tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.remoteAddress(config.host, config.tcpPort)
.handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper, workerEventLoop));
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
tcpBootstrap.channel(OioSocketChannel.class);
}
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
// epoll network stack is MUCH faster (but only on linux)
tcpBootstrap.channel(EpollSocketChannel.class);
}
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
// KQueue network stack is MUCH faster (but only on macosx)
tcpBootstrap.channel(KQueueSocketChannel.class);
}
else {
tcpBootstrap.channel(NioSocketChannel.class);
}
// android screws up on this!!
tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
.option(ChannelOption.SO_KEEPALIVE, true);
}
tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.remoteAddress(config.host, config.tcpPort)
.handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper,
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
// android screws up on this!!
tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
.option(ChannelOption.SO_KEEPALIVE, true);
if (udpBootstrap != null) {
bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap));
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
udpBootstrap.channel(OioDatagramChannel.class);
}
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
// epoll network stack is MUCH faster (but only on linux)
udpBootstrap.channel(EpollDatagramChannel.class);
}
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
// KQueue network stack is MUCH faster (but only on macosx)
udpBootstrap.channel(KQueueDatagramChannel.class);
}
else {
udpBootstrap.channel(NioDatagramChannel.class);
}
udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// Netty4 has a default of 2048 bytes as upper limit for datagram packets.
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize))
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.localAddress(new InetSocketAddress(0)) // bind to wildcard
.remoteAddress(new InetSocketAddress(config.host, config.udpPort))
.handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper, workerEventLoop));
if (config.udpPort > 0) {
Bootstrap udpBootstrap = new Bootstrap();
bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap));
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
udpBootstrap.channel(OioDatagramChannel.class);
}
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
// epoll network stack is MUCH faster (but only on linux)
udpBootstrap.channel(EpollDatagramChannel.class);
}
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
// KQueue network stack is MUCH faster (but only on macosx)
udpBootstrap.channel(KQueueDatagramChannel.class);
}
else {
udpBootstrap.channel(NioDatagramChannel.class);
}
udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// Netty4 has a default of 2048 bytes as upper limit for datagram packets.
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize))
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
.localAddress(new InetSocketAddress(0)) // bind to wildcard
.remoteAddress(new InetSocketAddress(config.host, config.udpPort))
.handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper,
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
// Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0)
// in order to WRITE: write as normal, just make sure it ends in .255
// in order to LISTEN:
// InetAddress group = InetAddress.getByName("203.0.113.0");
// NioDatagramChannel.joinGroup(group);
// THEN once done
// NioDatagramChannel.leaveGroup(group), close the socket
udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
.option(ChannelOption.SO_SNDBUF, udpMaxSize);
}
// Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0)
// in order to WRITE: write as normal, just make sure it ends in .255
// in order to LISTEN:
// InetAddress group = InetAddress.getByName("203.0.113.0");
// NioDatagramChannel.joinGroup(group);
// THEN once done
// NioDatagramChannel.leaveGroup(group), close the socket
udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
.option(ChannelOption.SO_SNDBUF, udpMaxSize);
}
}

View File

@ -67,6 +67,11 @@ class Configuration {
*/
public Executor rmiExecutor = null;
/**
* The number of threads used for the worker threads by the end point. By default, this is the CPU_COUNT/2 or 1, whichever is larger.
*/
public int workerThreadPoolSize = Math.max(Runtime.getRuntime().availableProcessors() / 2, 1);
public
Configuration() {

View File

@ -34,6 +34,7 @@ import io.netty.bootstrap.SessionBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollDatagramChannel;
@ -158,13 +159,12 @@ class Server<C extends Connection> extends EndPointServer {
.childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper));
}
// don't even bother with TCP/UDP if it's not enabled
if (tcpBootstrap == null && udpBootstrap == null) {
return;
EventLoopGroup workerEventLoop = null;
if (tcpBootstrap != null || udpBootstrap != null) {
workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName);
}
if (tcpBootstrap != null) {
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
@ -193,8 +193,7 @@ class Server<C extends Connection> extends EndPointServer {
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper,
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
.childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper, workerEventLoop));
// have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address!
if (hostName.equals("0.0.0.0")) {
@ -243,8 +242,7 @@ class Server<C extends Connection> extends EndPointServer {
// TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP
// OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets)
.localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722
.childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper,
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
.childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper, workerEventLoop));
// // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
// if (hostName.equals("0.0.0.0")) {

View File

@ -71,12 +71,6 @@ class Shutdownable {
@Property
public static final int WRITE_BUFF_LOW = 8 * 1024;
/**
* The number of threads used for the worker threads.
*/
@Property
public static int WORKER_THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors() / 2, 1);
/**
* The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully.
*/