From 0af57379e6c25254e2f489ff4fa3b37cc03cbf3e Mon Sep 17 00:00:00 2001 From: nathan Date: Mon, 2 Apr 2018 16:06:58 +0200 Subject: [PATCH] Fixed issues with multiple worker threads --- src/dorkbox/network/Client.java | 11 +- src/dorkbox/network/DnsServer.java | 8 +- src/dorkbox/network/Server.java | 13 +- .../network/connection/Shutdownable.java | 5 +- .../registration/RegistrationHandler.java | 5 + .../local/RegistrationLocalHandler.java | 5 +- .../local/RegistrationLocalHandlerClient.java | 5 +- .../local/RegistrationLocalHandlerServer.java | 5 +- .../remote/RegistrationRemoteHandler.java | 153 +++++++++++------- .../RegistrationRemoteHandlerClient.java | 12 +- .../RegistrationRemoteHandlerServer.java | 13 +- 11 files changed, 122 insertions(+), 113 deletions(-) diff --git a/src/dorkbox/network/Client.java b/src/dorkbox/network/Client.java index df781d46..0e33faf3 100644 --- a/src/dorkbox/network/Client.java +++ b/src/dorkbox/network/Client.java @@ -37,7 +37,6 @@ import dorkbox.util.exceptions.SecurityException; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.EpollDatagramChannel; @@ -108,8 +107,6 @@ class Client extends EndPointClient implements Connection localChannelName = config.localChannelName; hostName = config.host; - final EventLoopGroup workerEventLoop = newEventLoop(DEFAULT_THREAD_POOL_SIZE, threadName); - if (config.localChannelName != null && config.tcpPort <= 0 && config.udpPort <= 0) { // no networked bootstraps. LOCAL connection only @@ -119,7 +116,7 @@ class Client extends EndPointClient implements Connection localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS")) .channel(LocalChannel.class) .remoteAddress(new LocalAddress(config.localChannelName)) - .handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper, workerEventLoop)); + .handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper)); } else { if (config.host == null) { @@ -154,7 +151,8 @@ class Client extends EndPointClient implements Connection .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) .remoteAddress(config.host, config.tcpPort) - .handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper, workerEventLoop)); + .handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper, + newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); // android screws up on this!! tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()) @@ -190,7 +188,8 @@ class Client extends EndPointClient implements Connection .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) .localAddress(new InetSocketAddress(0)) // bind to wildcard .remoteAddress(new InetSocketAddress(config.host, config.udpPort)) - .handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper, workerEventLoop)); + .handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper, + newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0) // in order to WRITE: write as normal, just make sure it ends in .255 diff --git a/src/dorkbox/network/DnsServer.java b/src/dorkbox/network/DnsServer.java index 924841bf..0d3ac0e3 100644 --- a/src/dorkbox/network/DnsServer.java +++ b/src/dorkbox/network/DnsServer.java @@ -132,22 +132,22 @@ class DnsServer extends Shutdownable { if (OS.isAndroid()) { // android ONLY supports OIO (not NIO) boss = new OioEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup)); - work = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); + work = new OioEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); } else if (OS.isLinux() && NativeLibrary.isAvailable()) { // epoll network stack is MUCH faster (but only on linux) boss = new EpollEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup)); - work = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); + work = new EpollEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); } else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { // KQueue network stack is MUCH faster (but only on macosx) boss = new KQueueEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup)); - work = new KQueueEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); + work = new KQueueEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); } else { // sometimes the native libraries cannot be loaded, so fall back to NIO boss = new NioEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup)); - work = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); + work = new NioEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); } manageForShutdown(boss); diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java index 93b26d73..1f797b30 100644 --- a/src/dorkbox/network/Server.java +++ b/src/dorkbox/network/Server.java @@ -34,7 +34,6 @@ import io.netty.bootstrap.SessionBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.EpollDatagramChannel; @@ -147,18 +146,16 @@ class Server extends EndPointServer { String threadName = Server.class.getSimpleName(); - final EventLoopGroup workerEventLoop = newEventLoop(DEFAULT_THREAD_POOL_SIZE, threadName); - // always use local channels on the server. if (localBootstrap != null) { localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"), - newEventLoop(LOCAL, 1, threadName + "-JVM-HAND")) + newEventLoop(LOCAL, 1, threadName )) .channel(LocalServerChannel.class) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) .localAddress(new LocalAddress(localChannelName)) - .childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper, workerEventLoop)); + .childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper)); } // don't even bother with TCP/UDP if it's not enabled @@ -196,7 +193,8 @@ class Server extends EndPointServer { .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.SO_KEEPALIVE, true) - .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper, workerEventLoop)); + .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper, + newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); // have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address! if (hostName.equals("0.0.0.0")) { @@ -245,7 +243,8 @@ class Server extends EndPointServer { // TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP // OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets) .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722 - .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper, workerEventLoop)); + .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper, + newEventLoop(WORKER_THREAD_POOL_SIZE, threadName))); // // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address! // if (hostName.equals("0.0.0.0")) { diff --git a/src/dorkbox/network/connection/Shutdownable.java b/src/dorkbox/network/connection/Shutdownable.java index a436145d..d8e2ba61 100644 --- a/src/dorkbox/network/connection/Shutdownable.java +++ b/src/dorkbox/network/connection/Shutdownable.java @@ -72,11 +72,10 @@ class Shutdownable { public static final int WRITE_BUFF_LOW = 8 * 1024; /** - * this can be changed to a more specialized value, if necessary + * The number of threads used for the worker threads. */ @Property - public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime() - .availableProcessors() * 2; + public static int WORKER_THREAD_POOL_SIZE = Math.min(Runtime.getRuntime().availableProcessors() / 2, 1); /** * The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully. diff --git a/src/dorkbox/network/connection/registration/RegistrationHandler.java b/src/dorkbox/network/connection/registration/RegistrationHandler.java index 6ca500aa..0aa774d0 100644 --- a/src/dorkbox/network/connection/registration/RegistrationHandler.java +++ b/src/dorkbox/network/connection/registration/RegistrationHandler.java @@ -32,6 +32,11 @@ class RegistrationHandler extends ChannelInboundHandlerAdapter { protected final String name; protected final EventLoopGroup workerEventLoop; + /** + * @param name + * @param registrationWrapper + * @param workerEventLoop can be null for local JVM connections + */ public RegistrationHandler(final String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) { this.name = name; diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java index c4059eed..432dd1fd 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java @@ -20,15 +20,14 @@ import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.RegistrationHandler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoopGroup; import io.netty.util.AttributeKey; public abstract class RegistrationLocalHandler extends RegistrationHandler { public static final AttributeKey META_CHANNEL = AttributeKey.valueOf(RegistrationLocalHandler.class, "MetaChannel.local"); - RegistrationLocalHandler(String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) { - super(name, registrationWrapper, workerEventLoop); + RegistrationLocalHandler(String name, RegistrationWrapper registrationWrapper) { + super(name, registrationWrapper, null); } /** diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java index a77b140c..6a31a154 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerClient.java @@ -22,15 +22,14 @@ import dorkbox.network.connection.registration.Registration; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; public class RegistrationLocalHandlerClient extends RegistrationLocalHandler { public - RegistrationLocalHandlerClient(String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) { - super(name, registrationWrapper, workerEventLoop); + RegistrationLocalHandlerClient(String name, RegistrationWrapper registrationWrapper) { + super(name, registrationWrapper); } /** diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java index 866bc013..0779cb49 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandlerServer.java @@ -21,15 +21,14 @@ import dorkbox.network.connection.registration.MetaChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; public class RegistrationLocalHandlerServer extends RegistrationLocalHandler { public - RegistrationLocalHandlerServer(String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) { - super(name, registrationWrapper, workerEventLoop); + RegistrationLocalHandlerServer(String name, RegistrationWrapper registrationWrapper) { + super(name, registrationWrapper); } /** diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index fff65660..a820c2b7 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -34,13 +34,17 @@ import dorkbox.network.serialization.CryptoSerializationManager; import dorkbox.util.crypto.CryptoECC; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.GenericFutureListener; public abstract @@ -349,84 +353,109 @@ class RegistrationRemoteHandler extends RegistrationHandler { } } - final void cleanupPipeline(final MetaChannel metaChannel) { + final + void cleanupPipeline(final MetaChannel metaChannel, final long delay) { final int idleTimeout = this.registrationWrapper.getIdleTimeout(); try { // REMOVE our channel wrapper (only used for encryption) with the actual connection metaChannel.connection = ((ConnectionWrapper) metaChannel.connection).connection; - if (metaChannel.tcpChannel != null) { - final ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline(); - if (registrationWrapper.isClient()) { - pipeline.remove(RegistrationRemoteHandlerClientTCP.class); - } - else { - pipeline.remove(RegistrationRemoteHandlerServerTCP.class); - } - pipeline.remove(ConnectionWrapper.class); - - if (idleTimeout > 0) { - pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS)); - } else { - pipeline.remove(IDLE_HANDLER); - } - - pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection); - - // we also DEREGISTER and run on a different event loop! - ChannelFuture future = metaChannel.tcpChannel.deregister(); - future.addListener(new GenericFutureListener>() { - @Override - public - void operationComplete(final Future f) throws Exception { - if (f.isSuccess()) { - workerEventLoop.register(metaChannel.tcpChannel); - } - } - }); + cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.tcpChannel, true); } if (metaChannel.udpChannel != null) { - final ChannelPipeline pipeline = metaChannel.udpChannel.pipeline(); - if (registrationWrapper.isClient()) { - pipeline.remove(RegistrationRemoteHandlerClientUDP.class); - } - else { - pipeline.remove(RegistrationRemoteHandlerServerUDP.class); - } - pipeline.remove(ConnectionWrapper.class); - - if (idleTimeout > 0) { - pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS)); - } - else { - pipeline.remove(IDLE_HANDLER); - } - - pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection); - - // we also DEREGISTER and run on a different event loop! - // ONLY necessary for UDP-CLIENT, because for UDP-SERVER, the SessionManager takes care of this! - // if (registrationWrapper.isClient()) { - // ChannelFuture future = metaChannel.udpChannel.deregister(); - // future.addListener(new GenericFutureListener>() { - // @Override - // public - // void operationComplete(final Future f) throws Exception { - // if (f.isSuccess()) { - // workerEventLoop.register(metaChannel.udpChannel); - // } - // } - // }); - // } + cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.udpChannel, false); } } catch (Exception e) { logger.error("Error during pipeline replace", e); } } + private + void cleanupPipeline0(final long delay, + final int idleTimeout, + final MetaChannel metaChannel, + final ChannelHandler connection, + final Channel channel, + final boolean isTcp) { + final ChannelPipeline pipeline = channel.pipeline(); + + boolean isClient = registrationWrapper.isClient(); + if (isClient) { + if (isTcp) { + pipeline.remove(RegistrationRemoteHandlerClientTCP.class); + } + else { + pipeline.remove(RegistrationRemoteHandlerClientUDP.class); + } + } + else { + if (isTcp) { + pipeline.remove(RegistrationRemoteHandlerServerTCP.class); + } + else { + pipeline.remove(RegistrationRemoteHandlerServerUDP.class); + } + } + + pipeline.remove(ConnectionWrapper.class); + + if (idleTimeout > 0) { + pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS)); + } + else { + pipeline.remove(IDLE_HANDLER); + } + + pipeline.addLast(CONNECTION_HANDLER, connection); + + // we also DEREGISTER from the HANDSHAKE event-loop and run on the worker event-loop! + // if (isClient) { + ChannelFuture future = channel.deregister(); + future.addListener(new GenericFutureListener>() { + @Override + public + void operationComplete(final Future f) throws Exception { + if (f.isSuccess()) { + // final EventLoop next = workerEventLoop.next(); + + final ChannelPromise channelPromise = channel.newPromise(); + channelPromise.addListener(new FutureListener() { + @Override + public + void operationComplete(final Future future) throws Exception { + EventLoop loop = channel.eventLoop(); + loop.schedule(new Runnable() { + @Override + public + void run() { + logger.trace("Notify Connection"); + doConnect(metaChannel); + } + }, delay, TimeUnit.MILLISECONDS); + } + }); + + // TODO: TCP and UDP have to register on DIFFERENT event loops + workerEventLoop.register(channelPromise); + } + } + }); + // } + // else { + // channel.eventLoop().schedule(new Runnable() { + // @Override + // public + // void run() { + // logger.trace("Notify Connection"); + // doConnect(metaChannel); + // } + // }, delay, TimeUnit.MILLISECONDS); + // } + } + final void doConnect(final MetaChannel metaChannel) { // safe cast, because it's always this way... diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java index 3faeecfc..20097073 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java @@ -18,7 +18,6 @@ package dorkbox.network.connection.registration.remote; import java.math.BigInteger; import java.net.InetSocketAddress; import java.util.Arrays; -import java.util.concurrent.TimeUnit; import org.bouncycastle.crypto.BasicAgreement; import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement; @@ -215,15 +214,6 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { // remove the ConnectionWrapper (that was used to upgrade the connection) - cleanupPipeline(metaChannel); - - workerEventLoop.schedule(new Runnable() { - @Override - public - void run() { - logger.trace("Notify Connection"); - doConnect(metaChannel); - } - }, 20, TimeUnit.MILLISECONDS); + cleanupPipeline(metaChannel, 20); } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java index a3d60b6a..919f9e44 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java @@ -224,19 +224,10 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { // remove the ConnectionWrapper (that was used to upgrade the connection) - cleanupPipeline(metaChannel); + // wait for a "round trip" amount of time, then notify the APP! + cleanupPipeline(metaChannel, delay); // this tells the client we are ready to connect (we just bounce back the original message) channel.writeAndFlush(registration); - - // wait for a "round trip" amount of time, then notify the APP! - workerEventLoop.schedule(new Runnable() { - @Override - public - void run() { - logger.trace("Notify Connection"); - doConnect(metaChannel); - } - }, delay, TimeUnit.MILLISECONDS); } }