Cleancode polish + LOW/HIGH WATERMARK to client/server

This commit is contained in:
nathan 2016-02-28 03:11:17 +01:00
parent 8d1d43616a
commit 04bc273111
2 changed files with 168 additions and 154 deletions

View File

@ -114,21 +114,35 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
: Thread.currentThread() : Thread.currentThread()
.getThreadGroup(), threadName + " (Netty)"); .getThreadGroup(), threadName + " (Netty)");
final EventLoopGroup boss;
if (isAndroid) {
// android ONLY supports OIO (not NIO)
boss = new OioEventLoopGroup(0, new NamedThreadFactory(threadName, nettyGroup));
}
else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux)
boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, nettyGroup));
}
else {
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, nettyGroup));
}
manageForShutdown(boss);
if (options.localChannelName != null && options.tcpPort < 0 && options.udpPort < 0 && options.udtPort < 0) { if (options.localChannelName != null && options.tcpPort < 0 && options.udpPort < 0 && options.udtPort < 0) {
// no networked bootstraps. LOCAL connection only // no networked bootstraps. LOCAL connection only
Bootstrap localBootstrap = new Bootstrap(); Bootstrap localBootstrap = new Bootstrap();
this.bootstraps.add(new BootstrapWrapper("LOCAL", -1, localBootstrap)); this.bootstraps.add(new BootstrapWrapper("LOCAL", -1, localBootstrap));
EventLoopGroup boss; EventLoopGroup localBoss = new DefaultEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-LOCAL", nettyGroup));
boss = new DefaultEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-LOCAL", nettyGroup)); localBootstrap.group(localBoss)
localBootstrap.group(boss)
.channel(LocalChannel.class) .channel(LocalChannel.class)
.remoteAddress(new LocalAddress(options.localChannelName)) .remoteAddress(new LocalAddress(options.localChannelName))
.handler(new RegistrationLocalHandlerClient<C>(threadName, this.registrationWrapper)); .handler(new RegistrationLocalHandlerClient<C>(threadName, registrationWrapper));
manageForShutdown(boss); manageForShutdown(localBoss);
} }
else { else {
if (options.host == null) { if (options.host == null) {
@ -143,36 +157,30 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
Bootstrap tcpBootstrap = new Bootstrap(); Bootstrap tcpBootstrap = new Bootstrap();
this.bootstraps.add(new BootstrapWrapper("TCP", options.tcpPort, tcpBootstrap)); this.bootstraps.add(new BootstrapWrapper("TCP", options.tcpPort, tcpBootstrap));
EventLoopGroup boss;
if (isAndroid) { if (isAndroid) {
// android ONLY supports OIO (not NIO) // android ONLY supports OIO (not NIO)
boss = new OioEventLoopGroup(0, new NamedThreadFactory(threadName + "-TCP", nettyGroup));
tcpBootstrap.channel(OioSocketChannel.class); tcpBootstrap.channel(OioSocketChannel.class);
} }
else if (OS.isLinux()) { else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux) // JNI network stack is MUCH faster (but only on linux)
boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-TCP", nettyGroup));
tcpBootstrap.channel(EpollSocketChannel.class); tcpBootstrap.channel(EpollSocketChannel.class);
} }
else { else {
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-TCP", nettyGroup));
tcpBootstrap.channel(NioSocketChannel.class); tcpBootstrap.channel(NioSocketChannel.class);
} }
tcpBootstrap.group(boss) tcpBootstrap.group(boss)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFF_HIGH)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFF_LOW)
.remoteAddress(options.host, options.tcpPort) .remoteAddress(options.host, options.tcpPort)
.handler(new RegistrationRemoteHandlerClientTCP<C>(threadName, .handler(new RegistrationRemoteHandlerClientTCP<C>(threadName,
this.registrationWrapper, registrationWrapper,
this.serializationManager)); serializationManager));
manageForShutdown(boss);
// android screws up on this!! // android screws up on this!!
tcpBootstrap.option(ChannelOption.TCP_NODELAY, !isAndroid); tcpBootstrap.option(ChannelOption.TCP_NODELAY, !isAndroid)
tcpBootstrap.option(ChannelOption.SO_KEEPALIVE, true); .option(ChannelOption.SO_KEEPALIVE, true);
} }
@ -180,32 +188,27 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
Bootstrap udpBootstrap = new Bootstrap(); Bootstrap udpBootstrap = new Bootstrap();
this.bootstraps.add(new BootstrapWrapper("UDP", options.udpPort, udpBootstrap)); this.bootstraps.add(new BootstrapWrapper("UDP", options.udpPort, udpBootstrap));
EventLoopGroup boss;
if (isAndroid) { if (isAndroid) {
// android ONLY supports OIO (not NIO) // android ONLY supports OIO (not NIO)
boss = new OioEventLoopGroup(0, new NamedThreadFactory(threadName + "-UDP", nettyGroup));
udpBootstrap.channel(OioDatagramChannel.class); udpBootstrap.channel(OioDatagramChannel.class);
} }
else if (OS.isLinux()) { else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux) // JNI network stack is MUCH faster (but only on linux)
boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-UDP", nettyGroup));
udpBootstrap.channel(EpollDatagramChannel.class); udpBootstrap.channel(EpollDatagramChannel.class);
} }
else { else {
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-UDP", nettyGroup));
udpBootstrap.channel(NioDatagramChannel.class); udpBootstrap.channel(NioDatagramChannel.class);
} }
udpBootstrap.group(boss) udpBootstrap.group(boss)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFF_HIGH)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFF_LOW)
.localAddress(new InetSocketAddress(0)) .localAddress(new InetSocketAddress(0))
.remoteAddress(new InetSocketAddress(options.host, options.udpPort)) .remoteAddress(new InetSocketAddress(options.host, options.udpPort))
.handler(new RegistrationRemoteHandlerClientUDP<C>(threadName, .handler(new RegistrationRemoteHandlerClientUDP<C>(threadName,
this.registrationWrapper, registrationWrapper,
this.serializationManager)); serializationManager));
manageForShutdown(boss);
// Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0) // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0)
// in order to WRITE: write as normal, just make sure it ends in .255 // in order to WRITE: write as normal, just make sure it ends in .255
@ -214,8 +217,8 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
// NioDatagramChannel.joinGroup(group); // NioDatagramChannel.joinGroup(group);
// THEN once done // THEN once done
// NioDatagramChannel.leaveGroup(group), close the socket // NioDatagramChannel.leaveGroup(group), close the socket
udpBootstrap.option(ChannelOption.SO_BROADCAST, false); udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
udpBootstrap.option(ChannelOption.SO_SNDBUF, udpMaxSize); .option(ChannelOption.SO_SNDBUF, udpMaxSize);
} }
@ -235,20 +238,20 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
Bootstrap udtBootstrap = new Bootstrap(); Bootstrap udtBootstrap = new Bootstrap();
this.bootstraps.add(new BootstrapWrapper("UDT", options.udtPort, udtBootstrap)); this.bootstraps.add(new BootstrapWrapper("UDT", options.udtPort, udtBootstrap));
EventLoopGroup boss; EventLoopGroup udtBoss = UdtEndpointProxy.getClientWorker(DEFAULT_THREAD_POOL_SIZE, threadName, nettyGroup);
boss = UdtEndpointProxy.getClientWorker(DEFAULT_THREAD_POOL_SIZE, threadName, nettyGroup);
UdtEndpointProxy.setChannelFactory(udtBootstrap); UdtEndpointProxy.setChannelFactory(udtBootstrap);
udtBootstrap.group(boss) udtBootstrap.group(udtBoss)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFF_HIGH)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFF_LOW)
.remoteAddress(options.host, options.udtPort) .remoteAddress(options.host, options.udtPort)
.handler(new RegistrationRemoteHandlerClientUDT<C>(threadName, .handler(new RegistrationRemoteHandlerClientUDT<C>(threadName,
this.registrationWrapper, registrationWrapper,
this.serializationManager)); serializationManager));
manageForShutdown(boss); manageForShutdown(udtBoss);
} }
} }
} }
@ -390,6 +393,10 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
return this.connection.sendOnIdle(message); return this.connection.sendOnIdle(message);
} }
/**
* Marks the connection to be closed as soon as possible. This is evaluated when the current
* thread execution returns to the network stack.
*/
@Override @Override
public public
void closeAsap() { void closeAsap() {

View File

@ -93,7 +93,7 @@ class Server<C extends Connection> extends EndPointServer<C> {
// you have to make sure to use this.serialization // you have to make sure to use this.serialization
super(options); super(options);
Logger logger2 = this.logger; Logger logger2 = logger;
if (OS.isAndroid() && options.udtPort > 0) { if (OS.isAndroid() && options.udtPort > 0) {
// Android does not support UDT. // Android does not support UDT.
if (logger2.isInfoEnabled()) { if (logger2.isInfoEnabled()) {
@ -102,55 +102,55 @@ class Server<C extends Connection> extends EndPointServer<C> {
options.udtPort = -1; options.udtPort = -1;
} }
this.tcpPort = options.tcpPort; tcpPort = options.tcpPort;
this.udpPort = options.udpPort; udpPort = options.udpPort;
this.udtPort = options.udtPort; udtPort = options.udtPort;
this.localChannelName = options.localChannelName; localChannelName = options.localChannelName;
if (this.localChannelName != null) { if (localChannelName != null) {
this.localBootstrap = new ServerBootstrap(); localBootstrap = new ServerBootstrap();
} }
else { else {
this.localBootstrap = null; localBootstrap = null;
} }
if (this.tcpPort > 0) { if (tcpPort > 0) {
this.tcpBootstrap = new ServerBootstrap(); tcpBootstrap = new ServerBootstrap();
} }
else { else {
this.tcpBootstrap = null; tcpBootstrap = null;
} }
if (this.udpPort > 0) { if (udpPort > 0) {
this.udpBootstrap = new Bootstrap(); udpBootstrap = new Bootstrap();
} }
else { else {
this.udpBootstrap = null; udpBootstrap = null;
} }
String threadName = Server.class.getSimpleName(); String threadName = Server.class.getSimpleName();
if (this.udtPort > 0) { if (udtPort > 0) {
// check to see if we have UDT available! // check to see if we have UDT available!
boolean udtAvailable = false; boolean udtAvailable = false;
try { try {
Class.forName("com.barchart.udt.nio.SelectorProviderUDT"); Class.forName("com.barchart.udt.nio.SelectorProviderUDT");
udtAvailable = true; udtAvailable = true;
} catch (Throwable e) { } catch (Throwable e) {
logger2.error("Requested a UDT service on port {}, but the barchart UDT libraries are not loaded.", this.udtPort); logger2.error("Requested a UDT service on port {}, but the barchart UDT libraries are not loaded.", udtPort);
} }
if (udtAvailable) { if (udtAvailable) {
this.udtBootstrap = new ServerBootstrap(); udtBootstrap = new ServerBootstrap();
} }
else { else {
this.udtBootstrap = null; udtBootstrap = null;
} }
} }
else { else {
this.udtBootstrap = null; udtBootstrap = null;
} }
// setup the thread group to easily ID what the following threads belong to (and their spawned threads...) // setup the thread group to easily ID what the following threads belong to (and their spawned threads...)
@ -160,111 +160,116 @@ class Server<C extends Connection> extends EndPointServer<C> {
: Thread.currentThread() : Thread.currentThread()
.getThreadGroup(), threadName + " (Netty)"); .getThreadGroup(), threadName + " (Netty)");
final EventLoopGroup boss;
// always use local channels on the server. final EventLoopGroup worker;
{
EventLoopGroup boss;
EventLoopGroup worker;
if (this.localBootstrap != null) {
boss = new DefaultEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss-LOCAL", nettyGroup));
worker = new DefaultEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-worker-LOCAL",
nettyGroup));
this.localBootstrap.group(boss, worker)
.channel(LocalServerChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.localAddress(new LocalAddress(this.localChannelName))
.childHandler(new RegistrationLocalHandlerServer<C>(threadName, this.registrationWrapper));
manageForShutdown(boss);
manageForShutdown(worker);
}
}
if (this.tcpBootstrap != null) {
EventLoopGroup boss;
EventLoopGroup worker;
if (OS.isAndroid()) { if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO) // android ONLY supports OIO (not NIO)
boss = new OioEventLoopGroup(0, new NamedThreadFactory(threadName + "-boss-TCP", nettyGroup)); boss = new OioEventLoopGroup(0, new NamedThreadFactory(threadName + "-boss", nettyGroup));
worker = new OioEventLoopGroup(0, new NamedThreadFactory(threadName + "-worker-TCP", nettyGroup)); worker = new OioEventLoopGroup(0, new NamedThreadFactory(threadName, nettyGroup));
this.tcpBootstrap.channel(OioServerSocketChannel.class);
} }
else if (OS.isLinux()) { else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux) // JNI network stack is MUCH faster (but only on linux)
boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss-TCP", nettyGroup)); boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", nettyGroup));
worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-worker-TCP", nettyGroup)); worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, nettyGroup));
this.tcpBootstrap.channel(EpollServerSocketChannel.class);
} }
else { else {
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss-TCP", nettyGroup)); boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", nettyGroup));
worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-worker-TCP", nettyGroup)); worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, nettyGroup));
}
this.tcpBootstrap.channel(NioServerSocketChannel.class); manageForShutdown(boss);
manageForShutdown(worker);
// always use local channels on the server.
{
EventLoopGroup localBoss;
EventLoopGroup localWorker;
if (localBootstrap != null) {
localBoss = new DefaultEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss-LOCAL", nettyGroup));
localWorker = new DefaultEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-worker-LOCAL",
nettyGroup));
localBootstrap.group(localBoss, localWorker)
.channel(LocalServerChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFF_HIGH)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFF_LOW)
.localAddress(new LocalAddress(localChannelName))
.childHandler(new RegistrationLocalHandlerServer<C>(threadName, registrationWrapper));
manageForShutdown(localBoss);
manageForShutdown(localWorker);
}
}
if (tcpBootstrap != null) {
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
tcpBootstrap.channel(OioServerSocketChannel.class);
}
else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux)
tcpBootstrap.channel(EpollServerSocketChannel.class);
}
else {
tcpBootstrap.channel(NioServerSocketChannel.class);
} }
// TODO: If we use netty for an HTTP server, // TODO: If we use netty for an HTTP server,
// Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server. // Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server.
manageForShutdown(boss); tcpBootstrap.group(boss, worker)
manageForShutdown(worker);
this.tcpBootstrap.group(boss, worker)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_BACKLOG, backlogConnectionCount) .option(ChannelOption.SO_BACKLOG, backlogConnectionCount)
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFF_HIGH)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFF_LOW)
.childHandler(new RegistrationRemoteHandlerServerTCP<C>(threadName, .childHandler(new RegistrationRemoteHandlerServerTCP<C>(threadName,
this.registrationWrapper, registrationWrapper,
this.serializationManager)); serializationManager));
if (options.host != null) { if (options.host != null) {
this.tcpBootstrap.localAddress(options.host, this.tcpPort); tcpBootstrap.localAddress(options.host, tcpPort);
} }
else { else {
this.tcpBootstrap.localAddress(this.tcpPort); tcpBootstrap.localAddress(tcpPort);
} }
// android screws up on this!! // android screws up on this!!
this.tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()); tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
this.tcpBootstrap.childOption(ChannelOption.TCP_NODELAY, !OS.isAndroid()); .childOption(ChannelOption.TCP_NODELAY, !OS.isAndroid());
this.tcpBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
} }
if (this.udpBootstrap != null) { if (udpBootstrap != null) {
EventLoopGroup worker;
if (OS.isAndroid()) { if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO) // android ONLY supports OIO (not NIO)
worker = new OioEventLoopGroup(0, new NamedThreadFactory(threadName + "-worker-UDP", nettyGroup)); udpBootstrap.channel(OioDatagramChannel.class);
this.udpBootstrap.channel(OioDatagramChannel.class);
} }
else if (OS.isLinux()) { else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux) // JNI network stack is MUCH faster (but only on linux)
worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-worker-UDP", nettyGroup)); udpBootstrap.channel(EpollDatagramChannel.class)
this.udpBootstrap.channel(EpollDatagramChannel.class)
.option(EpollChannelOption.SO_REUSEPORT, true); .option(EpollChannelOption.SO_REUSEPORT, true);
} }
else { else {
worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-worker-UDP", nettyGroup)); udpBootstrap.channel(NioDatagramChannel.class);
this.udpBootstrap.channel(NioDatagramChannel.class);
} }
manageForShutdown(worker); udpBootstrap.group(worker)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFF_HIGH)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFF_LOW)
this.udpBootstrap.group(worker).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// not binding to specific address, since it's driven by TCP, and that can be bound to a specific address // not binding to specific address, since it's driven by TCP, and that can be bound to a specific address
.localAddress(this.udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets!
.handler(new RegistrationRemoteHandlerServerUDP<C>(threadName, .handler(new RegistrationRemoteHandlerServerUDP<C>(threadName,
this.registrationWrapper, registrationWrapper,
this.serializationManager)); serializationManager));
// Enable to READ from MULTICAST data (ie, 192.168.1.0) // Enable to READ from MULTICAST data (ie, 192.168.1.0)
@ -275,32 +280,34 @@ class Server<C extends Connection> extends EndPointServer<C> {
// THEN once done // THEN once done
// socket.leaveGroup(group), close the socket // socket.leaveGroup(group), close the socket
// Enable to WRITE to MULTICAST data (ie, 192.168.1.0) // Enable to WRITE to MULTICAST data (ie, 192.168.1.0)
this.udpBootstrap.option(ChannelOption.SO_BROADCAST, false); udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
this.udpBootstrap.option(ChannelOption.SO_SNDBUF, udpMaxSize); .option(ChannelOption.SO_SNDBUF, udpMaxSize);
} }
if (this.udtBootstrap != null) { if (udtBootstrap != null) {
EventLoopGroup boss; EventLoopGroup udtBoss;
EventLoopGroup worker; EventLoopGroup udtWorker;
// all of this must be proxied to another class, so THIS class doesn't have unmet dependencies. // all of this must be proxied to another class, so THIS class doesn't have unmet dependencies.
// Annoying and abusing the classloader, but it works well. // Annoying and abusing the classloader, but it works well.
boss = UdtEndpointProxy.getServerBoss(DEFAULT_THREAD_POOL_SIZE, threadName, nettyGroup); udtBoss = UdtEndpointProxy.getServerBoss(DEFAULT_THREAD_POOL_SIZE, threadName, nettyGroup);
worker = UdtEndpointProxy.getServerWorker(DEFAULT_THREAD_POOL_SIZE, threadName, nettyGroup); udtWorker = UdtEndpointProxy.getServerWorker(DEFAULT_THREAD_POOL_SIZE, threadName, nettyGroup);
UdtEndpointProxy.setChannelFactory(this.udtBootstrap); UdtEndpointProxy.setChannelFactory(udtBootstrap);
this.udtBootstrap.group(boss, worker) udtBootstrap.group(udtBoss, udtWorker)
.option(ChannelOption.SO_BACKLOG, backlogConnectionCount).option(ChannelOption.ALLOCATOR, .option(ChannelOption.SO_BACKLOG, backlogConnectionCount)
PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFF_HIGH)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFF_LOW)
// not binding to specific address, since it's driven by TCP, and that can be bound to a specific address // not binding to specific address, since it's driven by TCP, and that can be bound to a specific address
.localAddress(this.udtPort) .localAddress(udtPort)
.childHandler(new RegistrationRemoteHandlerServerUDT<C>(threadName, .childHandler(new RegistrationRemoteHandlerServerUDT<C>(threadName,
this.registrationWrapper, registrationWrapper,
this.serializationManager)); serializationManager));
manageForShutdown(boss); manageForShutdown(udtBoss);
manageForShutdown(worker); manageForShutdown(udtWorker);
} }
} }
@ -329,7 +336,7 @@ class Server<C extends Connection> extends EndPointServer<C> {
void bind(boolean blockUntilTerminate) { void bind(boolean blockUntilTerminate) {
// make sure we are not trying to connect during a close or stop event. // make sure we are not trying to connect during a close or stop event.
// This will wait until we have finished starting up/shutting down. // This will wait until we have finished starting up/shutting down.
synchronized (this.shutdownInProgress) { synchronized (shutdownInProgress) {
} }
@ -337,10 +344,10 @@ class Server<C extends Connection> extends EndPointServer<C> {
ChannelFuture future; ChannelFuture future;
// LOCAL // LOCAL
Logger logger2 = this.logger; Logger logger2 = logger;
if (this.localBootstrap != null) { if (localBootstrap != null) {
try { try {
future = this.localBootstrap.bind(); future = localBootstrap.bind();
future.await(); future.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
String errorMessage = stopWithErrorMessage(logger2, "Could not bind to LOCAL address on the server.", e); String errorMessage = stopWithErrorMessage(logger2, "Could not bind to LOCAL address on the server.", e);
@ -352,74 +359,74 @@ class Server<C extends Connection> extends EndPointServer<C> {
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
logger2.info("Listening on LOCAL address: '{}'", this.localChannelName); logger2.info("Listening on LOCAL address: '{}'", localChannelName);
manageForShutdown(future); manageForShutdown(future);
} }
// TCP // TCP
if (this.tcpBootstrap != null) { if (tcpBootstrap != null) {
// Wait until the connection attempt succeeds or fails. // Wait until the connection attempt succeeds or fails.
try { try {
future = this.tcpBootstrap.bind(); future = tcpBootstrap.bind();
future.await(); future.await();
} catch (Exception e) { } catch (Exception e) {
String errorMessage = stopWithErrorMessage(logger2, "Could not bind to TCP port " + this.tcpPort + " on the server.", e); String errorMessage = stopWithErrorMessage(logger2, "Could not bind to TCP port " + tcpPort + " on the server.", e);
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
String errorMessage = stopWithErrorMessage(logger2, String errorMessage = stopWithErrorMessage(logger2,
"Could not bind to TCP port " + this.tcpPort + " on the server.", "Could not bind to TCP port " + tcpPort + " on the server.",
future.cause()); future.cause());
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
logger2.info("Listening on TCP port: {}", this.tcpPort); logger2.info("Listening on TCP port: {}", tcpPort);
manageForShutdown(future); manageForShutdown(future);
} }
// UDP // UDP
if (this.udpBootstrap != null) { if (udpBootstrap != null) {
// Wait until the connection attempt succeeds or fails. // Wait until the connection attempt succeeds or fails.
try { try {
future = this.udpBootstrap.bind(); future = udpBootstrap.bind();
future.await(); future.await();
} catch (Exception e) { } catch (Exception e) {
String errorMessage = stopWithErrorMessage(logger2, "Could not bind to UDP port " + this.udpPort + " on the server.", e); String errorMessage = stopWithErrorMessage(logger2, "Could not bind to UDP port " + udpPort + " on the server.", e);
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
String errorMessage = stopWithErrorMessage(logger2, String errorMessage = stopWithErrorMessage(logger2,
"Could not bind to UDP port " + this.udpPort + " on the server.", "Could not bind to UDP port " + udpPort + " on the server.",
future.cause()); future.cause());
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
logger2.info("Listening on UDP port: {}", this.udpPort); logger2.info("Listening on UDP port: {}", udpPort);
manageForShutdown(future); manageForShutdown(future);
} }
// UDT // UDT
if (this.udtBootstrap != null) { if (udtBootstrap != null) {
// Wait until the connection attempt succeeds or fails. // Wait until the connection attempt succeeds or fails.
try { try {
future = this.udtBootstrap.bind(); future = udtBootstrap.bind();
future.await(); future.await();
} catch (Exception e) { } catch (Exception e) {
String errorMessage = stopWithErrorMessage(logger2, "Could not bind to UDT port " + this.udtPort + " on the server.", e); String errorMessage = stopWithErrorMessage(logger2, "Could not bind to UDT port " + udtPort + " on the server.", e);
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
String errorMessage = stopWithErrorMessage(logger2, String errorMessage = stopWithErrorMessage(logger2,
"Could not bind to UDT port " + this.udtPort + " on the server.", "Could not bind to UDT port " + udtPort + " on the server.",
future.cause()); future.cause());
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
logger2.info("Listening on UDT port: {}", this.udtPort); logger2.info("Listening on UDT port: {}", udtPort);
manageForShutdown(future); manageForShutdown(future);
} }