diff --git a/src/dorkbox/network/Client.java b/src/dorkbox/network/Client.java index 34095a51..488084be 100644 --- a/src/dorkbox/network/Client.java +++ b/src/dorkbox/network/Client.java @@ -37,6 +37,7 @@ import dorkbox.util.exceptions.SecurityException; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.EpollDatagramChannel; @@ -108,17 +109,16 @@ class Client extends EndPointClient implements Connection localChannelName = config.localChannelName; hostName = config.host; - if (config.localChannelName != null && config.tcpPort <= 0 && config.udpPort <= 0) { - // no networked bootstraps. LOCAL connection only - Bootstrap localBootstrap = new Bootstrap(); - bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap)); + Bootstrap localBootstrap = null; + Bootstrap tcpBootstrap = null; + Bootstrap udpBootstrap = null; - localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS")) - .channel(LocalChannel.class) - .remoteAddress(new LocalAddress(config.localChannelName)) - .handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper)); + + if (config.localChannelName != null) { + localBootstrap = new Bootstrap(); } - else { + + if (config.tcpPort > 0 || config.udpPort > 0) { if (config.host == null) { throw new IllegalArgumentException("You must define what host you want to connect to."); } @@ -126,85 +126,108 @@ class Client extends EndPointClient implements Connection if (config.host.equals("0.0.0.0")) { throw new IllegalArgumentException("You cannot connect to 0.0.0.0, you must define what host you want to connect to."); } + } - if (config.tcpPort <= 0 && config.udpPort <= 0) { - throw new IllegalArgumentException("You must define what port you want to connect to."); + if (config.tcpPort > 0) { + tcpBootstrap = new Bootstrap(); + } + + if (config.udpPort > 0) { + udpBootstrap = new Bootstrap(); + } + + if (localBootstrap == null && tcpBootstrap == null && udpBootstrap == null) { + throw new IllegalArgumentException("You must define how you want to connect, either LOCAL channel name, TCP port, or UDP port"); + } + + + + if (config.localChannelName != null) { + // no networked bootstraps. LOCAL connection only + + bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap)); + + localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS")) + .channel(LocalChannel.class) + .remoteAddress(new LocalAddress(config.localChannelName)) + .handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper)); + } + + + EventLoopGroup workerEventLoop = null; + if (tcpBootstrap != null || udpBootstrap != null) { + workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName); + } + + + if (tcpBootstrap != null) { + bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap)); + + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + tcpBootstrap.channel(OioSocketChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // epoll network stack is MUCH faster (but only on linux) + tcpBootstrap.channel(EpollSocketChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + tcpBootstrap.channel(KQueueSocketChannel.class); + } + else { + tcpBootstrap.channel(NioSocketChannel.class); } - if (config.tcpPort > 0) { - Bootstrap tcpBootstrap = new Bootstrap(); - bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap)); + tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS")) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) + .remoteAddress(config.host, config.tcpPort) + .handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper, workerEventLoop)); - if (OS.isAndroid()) { - // android ONLY supports OIO (not NIO) - tcpBootstrap.channel(OioSocketChannel.class); - } - else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // epoll network stack is MUCH faster (but only on linux) - tcpBootstrap.channel(EpollSocketChannel.class); - } - else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // KQueue network stack is MUCH faster (but only on macosx) - tcpBootstrap.channel(KQueueSocketChannel.class); - } - else { - tcpBootstrap.channel(NioSocketChannel.class); - } + // android screws up on this!! + tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()) + .option(ChannelOption.SO_KEEPALIVE, true); + } - tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS")) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - .remoteAddress(config.host, config.tcpPort) - .handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper, - newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); - // android screws up on this!! - tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()) - .option(ChannelOption.SO_KEEPALIVE, true); + if (udpBootstrap != null) { + bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap)); + + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + udpBootstrap.channel(OioDatagramChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // epoll network stack is MUCH faster (but only on linux) + udpBootstrap.channel(EpollDatagramChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + udpBootstrap.channel(KQueueDatagramChannel.class); + } + else { + udpBootstrap.channel(NioDatagramChannel.class); } + udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS")) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + // Netty4 has a default of 2048 bytes as upper limit for datagram packets. + .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize)) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) + .localAddress(new InetSocketAddress(0)) // bind to wildcard + .remoteAddress(new InetSocketAddress(config.host, config.udpPort)) + .handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper, workerEventLoop)); - if (config.udpPort > 0) { - Bootstrap udpBootstrap = new Bootstrap(); - bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap)); - - if (OS.isAndroid()) { - // android ONLY supports OIO (not NIO) - udpBootstrap.channel(OioDatagramChannel.class); - } - else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // epoll network stack is MUCH faster (but only on linux) - udpBootstrap.channel(EpollDatagramChannel.class); - } - else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // KQueue network stack is MUCH faster (but only on macosx) - udpBootstrap.channel(KQueueDatagramChannel.class); - } - else { - udpBootstrap.channel(NioDatagramChannel.class); - } - - - udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS")) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - // Netty4 has a default of 2048 bytes as upper limit for datagram packets. - .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize)) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - .localAddress(new InetSocketAddress(0)) // bind to wildcard - .remoteAddress(new InetSocketAddress(config.host, config.udpPort)) - .handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper, - newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); - - // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0) - // in order to WRITE: write as normal, just make sure it ends in .255 - // in order to LISTEN: - // InetAddress group = InetAddress.getByName("203.0.113.0"); - // NioDatagramChannel.joinGroup(group); - // THEN once done - // NioDatagramChannel.leaveGroup(group), close the socket - udpBootstrap.option(ChannelOption.SO_BROADCAST, false) - .option(ChannelOption.SO_SNDBUF, udpMaxSize); - } + // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0) + // in order to WRITE: write as normal, just make sure it ends in .255 + // in order to LISTEN: + // InetAddress group = InetAddress.getByName("203.0.113.0"); + // NioDatagramChannel.joinGroup(group); + // THEN once done + // NioDatagramChannel.leaveGroup(group), close the socket + udpBootstrap.option(ChannelOption.SO_BROADCAST, false) + .option(ChannelOption.SO_SNDBUF, udpMaxSize); } } diff --git a/src/dorkbox/network/Configuration.java b/src/dorkbox/network/Configuration.java index afce47dc..642de464 100644 --- a/src/dorkbox/network/Configuration.java +++ b/src/dorkbox/network/Configuration.java @@ -67,6 +67,11 @@ class Configuration { */ public Executor rmiExecutor = null; + /** + * The number of threads used for the worker threads by the end point. By default, this is the CPU_COUNT/2 or 1, whichever is larger. + */ + public int workerThreadPoolSize = Math.max(Runtime.getRuntime().availableProcessors() / 2, 1); + public Configuration() { diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java index be13d574..ec71a3cd 100644 --- a/src/dorkbox/network/Server.java +++ b/src/dorkbox/network/Server.java @@ -34,6 +34,7 @@ import io.netty.bootstrap.SessionBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.EpollDatagramChannel; @@ -158,13 +159,12 @@ class Server extends EndPointServer { .childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper)); } - // don't even bother with TCP/UDP if it's not enabled - if (tcpBootstrap == null && udpBootstrap == null) { - return; + + EventLoopGroup workerEventLoop = null; + if (tcpBootstrap != null || udpBootstrap != null) { + workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName); } - - if (tcpBootstrap != null) { if (OS.isAndroid()) { // android ONLY supports OIO (not NIO) @@ -193,8 +193,7 @@ class Server extends EndPointServer { .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.SO_KEEPALIVE, true) - .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper, - newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); + .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper, workerEventLoop)); // have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address! if (hostName.equals("0.0.0.0")) { @@ -243,8 +242,7 @@ class Server extends EndPointServer { // TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP // OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets) .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722 - .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper, - newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); + .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper, workerEventLoop)); // // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address! // if (hostName.equals("0.0.0.0")) { diff --git a/src/dorkbox/network/connection/Shutdownable.java b/src/dorkbox/network/connection/Shutdownable.java index 4c86a7d7..7d16cb1b 100644 --- a/src/dorkbox/network/connection/Shutdownable.java +++ b/src/dorkbox/network/connection/Shutdownable.java @@ -71,12 +71,6 @@ class Shutdownable { @Property public static final int WRITE_BUFF_LOW = 8 * 1024; - /** - * The number of threads used for the worker threads. - */ - @Property - public static int WORKER_THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors() / 2, 1); - /** * The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully. */