Fixed issues with multiple worker threads
This commit is contained in:
parent
6156ac1569
commit
0af57379e6
|
@ -37,7 +37,6 @@ import dorkbox.util.exceptions.SecurityException;
|
|||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||
import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.epoll.EpollDatagramChannel;
|
||||
|
@ -108,8 +107,6 @@ class Client<C extends Connection> extends EndPointClient implements Connection
|
|||
|
||||
localChannelName = config.localChannelName;
|
||||
hostName = config.host;
|
||||
final EventLoopGroup workerEventLoop = newEventLoop(DEFAULT_THREAD_POOL_SIZE, threadName);
|
||||
|
||||
|
||||
if (config.localChannelName != null && config.tcpPort <= 0 && config.udpPort <= 0) {
|
||||
// no networked bootstraps. LOCAL connection only
|
||||
|
@ -119,7 +116,7 @@ class Client<C extends Connection> extends EndPointClient implements Connection
|
|||
localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"))
|
||||
.channel(LocalChannel.class)
|
||||
.remoteAddress(new LocalAddress(config.localChannelName))
|
||||
.handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper, workerEventLoop));
|
||||
.handler(new RegistrationLocalHandlerClient(threadName, registrationWrapper));
|
||||
}
|
||||
else {
|
||||
if (config.host == null) {
|
||||
|
@ -154,7 +151,8 @@ class Client<C extends Connection> extends EndPointClient implements Connection
|
|||
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
|
||||
.remoteAddress(config.host, config.tcpPort)
|
||||
.handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper, workerEventLoop));
|
||||
.handler(new RegistrationRemoteHandlerClientTCP(threadName, registrationWrapper,
|
||||
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
|
||||
|
||||
// android screws up on this!!
|
||||
tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
|
||||
|
@ -190,7 +188,8 @@ class Client<C extends Connection> extends EndPointClient implements Connection
|
|||
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
|
||||
.localAddress(new InetSocketAddress(0)) // bind to wildcard
|
||||
.remoteAddress(new InetSocketAddress(config.host, config.udpPort))
|
||||
.handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper, workerEventLoop));
|
||||
.handler(new RegistrationRemoteHandlerClientUDP(threadName, registrationWrapper,
|
||||
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
|
||||
|
||||
// Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0)
|
||||
// in order to WRITE: write as normal, just make sure it ends in .255
|
||||
|
|
|
@ -132,22 +132,22 @@ class DnsServer extends Shutdownable {
|
|||
if (OS.isAndroid()) {
|
||||
// android ONLY supports OIO (not NIO)
|
||||
boss = new OioEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||
work = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
work = new OioEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
}
|
||||
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
|
||||
// epoll network stack is MUCH faster (but only on linux)
|
||||
boss = new EpollEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||
work = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
work = new EpollEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
}
|
||||
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
|
||||
// KQueue network stack is MUCH faster (but only on macosx)
|
||||
boss = new KQueueEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||
work = new KQueueEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
work = new KQueueEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
}
|
||||
else {
|
||||
// sometimes the native libraries cannot be loaded, so fall back to NIO
|
||||
boss = new NioEventLoopGroup(1, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||
work = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
work = new NioEventLoopGroup(WORKER_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||
}
|
||||
|
||||
manageForShutdown(boss);
|
||||
|
|
|
@ -34,7 +34,6 @@ import io.netty.bootstrap.SessionBootstrap;
|
|||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||
import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.epoll.EpollDatagramChannel;
|
||||
|
@ -147,18 +146,16 @@ class Server<C extends Connection> extends EndPointServer {
|
|||
|
||||
|
||||
String threadName = Server.class.getSimpleName();
|
||||
final EventLoopGroup workerEventLoop = newEventLoop(DEFAULT_THREAD_POOL_SIZE, threadName);
|
||||
|
||||
|
||||
// always use local channels on the server.
|
||||
if (localBootstrap != null) {
|
||||
localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"),
|
||||
newEventLoop(LOCAL, 1, threadName + "-JVM-HAND"))
|
||||
newEventLoop(LOCAL, 1, threadName ))
|
||||
.channel(LocalServerChannel.class)
|
||||
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
|
||||
.localAddress(new LocalAddress(localChannelName))
|
||||
.childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper, workerEventLoop));
|
||||
.childHandler(new RegistrationLocalHandlerServer(threadName, registrationWrapper));
|
||||
}
|
||||
|
||||
// don't even bother with TCP/UDP if it's not enabled
|
||||
|
@ -196,7 +193,8 @@ class Server<C extends Connection> extends EndPointServer {
|
|||
|
||||
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper, workerEventLoop));
|
||||
.childHandler(new RegistrationRemoteHandlerServerTCP(threadName, registrationWrapper,
|
||||
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
|
||||
|
||||
// have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address!
|
||||
if (hostName.equals("0.0.0.0")) {
|
||||
|
@ -245,7 +243,8 @@ class Server<C extends Connection> extends EndPointServer {
|
|||
// TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP
|
||||
// OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets)
|
||||
.localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722
|
||||
.childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper, workerEventLoop));
|
||||
.childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper,
|
||||
newEventLoop(WORKER_THREAD_POOL_SIZE, threadName)));
|
||||
|
||||
// // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
|
||||
// if (hostName.equals("0.0.0.0")) {
|
||||
|
|
|
@ -72,11 +72,10 @@ class Shutdownable {
|
|||
public static final int WRITE_BUFF_LOW = 8 * 1024;
|
||||
|
||||
/**
|
||||
* this can be changed to a more specialized value, if necessary
|
||||
* The number of threads used for the worker threads.
|
||||
*/
|
||||
@Property
|
||||
public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime()
|
||||
.availableProcessors() * 2;
|
||||
public static int WORKER_THREAD_POOL_SIZE = Math.min(Runtime.getRuntime().availableProcessors() / 2, 1);
|
||||
|
||||
/**
|
||||
* The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully.
|
||||
|
|
|
@ -32,6 +32,11 @@ class RegistrationHandler extends ChannelInboundHandlerAdapter {
|
|||
protected final String name;
|
||||
protected final EventLoopGroup workerEventLoop;
|
||||
|
||||
/**
|
||||
* @param name
|
||||
* @param registrationWrapper
|
||||
* @param workerEventLoop can be null for local JVM connections
|
||||
*/
|
||||
public
|
||||
RegistrationHandler(final String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) {
|
||||
this.name = name;
|
||||
|
|
|
@ -20,15 +20,14 @@ import dorkbox.network.connection.registration.MetaChannel;
|
|||
import dorkbox.network.connection.registration.RegistrationHandler;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.AttributeKey;
|
||||
|
||||
public abstract
|
||||
class RegistrationLocalHandler extends RegistrationHandler {
|
||||
public static final AttributeKey<MetaChannel> META_CHANNEL = AttributeKey.valueOf(RegistrationLocalHandler.class, "MetaChannel.local");
|
||||
|
||||
RegistrationLocalHandler(String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) {
|
||||
super(name, registrationWrapper, workerEventLoop);
|
||||
RegistrationLocalHandler(String name, RegistrationWrapper registrationWrapper) {
|
||||
super(name, registrationWrapper, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,15 +22,14 @@ import dorkbox.network.connection.registration.Registration;
|
|||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
|
||||
public
|
||||
class RegistrationLocalHandlerClient extends RegistrationLocalHandler {
|
||||
|
||||
public
|
||||
RegistrationLocalHandlerClient(String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) {
|
||||
super(name, registrationWrapper, workerEventLoop);
|
||||
RegistrationLocalHandlerClient(String name, RegistrationWrapper registrationWrapper) {
|
||||
super(name, registrationWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,15 +21,14 @@ import dorkbox.network.connection.registration.MetaChannel;
|
|||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
|
||||
public
|
||||
class RegistrationLocalHandlerServer extends RegistrationLocalHandler {
|
||||
|
||||
public
|
||||
RegistrationLocalHandlerServer(String name, RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) {
|
||||
super(name, registrationWrapper, workerEventLoop);
|
||||
RegistrationLocalHandlerServer(String name, RegistrationWrapper registrationWrapper) {
|
||||
super(name, registrationWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,13 +34,17 @@ import dorkbox.network.serialization.CryptoSerializationManager;
|
|||
import dorkbox.util.crypto.CryptoECC;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
|
||||
public abstract
|
||||
|
@ -349,82 +353,107 @@ class RegistrationRemoteHandler extends RegistrationHandler {
|
|||
}
|
||||
}
|
||||
|
||||
final void cleanupPipeline(final MetaChannel metaChannel) {
|
||||
final
|
||||
void cleanupPipeline(final MetaChannel metaChannel, final long delay) {
|
||||
final int idleTimeout = this.registrationWrapper.getIdleTimeout();
|
||||
|
||||
try {
|
||||
// REMOVE our channel wrapper (only used for encryption) with the actual connection
|
||||
metaChannel.connection = ((ConnectionWrapper) metaChannel.connection).connection;
|
||||
|
||||
|
||||
if (metaChannel.tcpChannel != null) {
|
||||
final ChannelPipeline pipeline = metaChannel.tcpChannel.pipeline();
|
||||
if (registrationWrapper.isClient()) {
|
||||
cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.tcpChannel, true);
|
||||
}
|
||||
|
||||
if (metaChannel.udpChannel != null) {
|
||||
cleanupPipeline0(delay, idleTimeout, metaChannel, metaChannel.connection, metaChannel.udpChannel, false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error during pipeline replace", e);
|
||||
}
|
||||
}
|
||||
|
||||
private
|
||||
void cleanupPipeline0(final long delay,
|
||||
final int idleTimeout,
|
||||
final MetaChannel metaChannel,
|
||||
final ChannelHandler connection,
|
||||
final Channel channel,
|
||||
final boolean isTcp) {
|
||||
final ChannelPipeline pipeline = channel.pipeline();
|
||||
|
||||
boolean isClient = registrationWrapper.isClient();
|
||||
if (isClient) {
|
||||
if (isTcp) {
|
||||
pipeline.remove(RegistrationRemoteHandlerClientTCP.class);
|
||||
}
|
||||
else {
|
||||
pipeline.remove(RegistrationRemoteHandlerClientUDP.class);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (isTcp) {
|
||||
pipeline.remove(RegistrationRemoteHandlerServerTCP.class);
|
||||
}
|
||||
else {
|
||||
pipeline.remove(RegistrationRemoteHandlerServerUDP.class);
|
||||
}
|
||||
}
|
||||
|
||||
pipeline.remove(ConnectionWrapper.class);
|
||||
|
||||
if (idleTimeout > 0) {
|
||||
pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS));
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
pipeline.remove(IDLE_HANDLER);
|
||||
}
|
||||
|
||||
pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection);
|
||||
pipeline.addLast(CONNECTION_HANDLER, connection);
|
||||
|
||||
// we also DEREGISTER and run on a different event loop!
|
||||
ChannelFuture future = metaChannel.tcpChannel.deregister();
|
||||
// we also DEREGISTER from the HANDSHAKE event-loop and run on the worker event-loop!
|
||||
// if (isClient) {
|
||||
ChannelFuture future = channel.deregister();
|
||||
future.addListener(new GenericFutureListener<Future<? super Void>>() {
|
||||
@Override
|
||||
public
|
||||
void operationComplete(final Future<? super Void> f) throws Exception {
|
||||
if (f.isSuccess()) {
|
||||
workerEventLoop.register(metaChannel.tcpChannel);
|
||||
// final EventLoop next = workerEventLoop.next();
|
||||
|
||||
final ChannelPromise channelPromise = channel.newPromise();
|
||||
channelPromise.addListener(new FutureListener<Void>() {
|
||||
@Override
|
||||
public
|
||||
void operationComplete(final Future<Void> future) throws Exception {
|
||||
EventLoop loop = channel.eventLoop();
|
||||
loop.schedule(new Runnable() {
|
||||
@Override
|
||||
public
|
||||
void run() {
|
||||
logger.trace("Notify Connection");
|
||||
doConnect(metaChannel);
|
||||
}
|
||||
}, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
});
|
||||
|
||||
// TODO: TCP and UDP have to register on DIFFERENT event loops
|
||||
workerEventLoop.register(channelPromise);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (metaChannel.udpChannel != null) {
|
||||
final ChannelPipeline pipeline = metaChannel.udpChannel.pipeline();
|
||||
if (registrationWrapper.isClient()) {
|
||||
pipeline.remove(RegistrationRemoteHandlerClientUDP.class);
|
||||
}
|
||||
else {
|
||||
pipeline.remove(RegistrationRemoteHandlerServerUDP.class);
|
||||
}
|
||||
pipeline.remove(ConnectionWrapper.class);
|
||||
|
||||
if (idleTimeout > 0) {
|
||||
pipeline.replace(IDLE_HANDLER, IDLE_HANDLER_FULL, new IdleStateHandler(0, 0, idleTimeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
else {
|
||||
pipeline.remove(IDLE_HANDLER);
|
||||
}
|
||||
|
||||
pipeline.addLast(CONNECTION_HANDLER, metaChannel.connection);
|
||||
|
||||
// we also DEREGISTER and run on a different event loop!
|
||||
// ONLY necessary for UDP-CLIENT, because for UDP-SERVER, the SessionManager takes care of this!
|
||||
// if (registrationWrapper.isClient()) {
|
||||
// ChannelFuture future = metaChannel.udpChannel.deregister();
|
||||
// future.addListener(new GenericFutureListener<Future<? super Void>>() {
|
||||
// }
|
||||
// else {
|
||||
// channel.eventLoop().schedule(new Runnable() {
|
||||
// @Override
|
||||
// public
|
||||
// void operationComplete(final Future<? super Void> f) throws Exception {
|
||||
// if (f.isSuccess()) {
|
||||
// workerEventLoop.register(metaChannel.udpChannel);
|
||||
// void run() {
|
||||
// logger.trace("Notify Connection");
|
||||
// doConnect(metaChannel);
|
||||
// }
|
||||
// }, delay, TimeUnit.MILLISECONDS);
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error during pipeline replace", e);
|
||||
}
|
||||
}
|
||||
|
||||
final
|
||||
|
|
|
@ -18,7 +18,6 @@ package dorkbox.network.connection.registration.remote;
|
|||
import java.math.BigInteger;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.bouncycastle.crypto.BasicAgreement;
|
||||
import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement;
|
||||
|
@ -215,15 +214,6 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
|
|||
|
||||
|
||||
// remove the ConnectionWrapper (that was used to upgrade the connection)
|
||||
cleanupPipeline(metaChannel);
|
||||
|
||||
workerEventLoop.schedule(new Runnable() {
|
||||
@Override
|
||||
public
|
||||
void run() {
|
||||
logger.trace("Notify Connection");
|
||||
doConnect(metaChannel);
|
||||
}
|
||||
}, 20, TimeUnit.MILLISECONDS);
|
||||
cleanupPipeline(metaChannel, 20);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -224,19 +224,10 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
|
|||
|
||||
|
||||
// remove the ConnectionWrapper (that was used to upgrade the connection)
|
||||
cleanupPipeline(metaChannel);
|
||||
// wait for a "round trip" amount of time, then notify the APP!
|
||||
cleanupPipeline(metaChannel, delay);
|
||||
|
||||
// this tells the client we are ready to connect (we just bounce back the original message)
|
||||
channel.writeAndFlush(registration);
|
||||
|
||||
// wait for a "round trip" amount of time, then notify the APP!
|
||||
workerEventLoop.schedule(new Runnable() {
|
||||
@Override
|
||||
public
|
||||
void run() {
|
||||
logger.trace("Notify Connection");
|
||||
doConnect(metaChannel);
|
||||
}
|
||||
}, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user