WIP DnsServer + Server Handlers

This commit is contained in:
nathan 2018-01-11 21:14:46 +01:00
parent 46bb211bf8
commit 71111c4e3c
10 changed files with 508 additions and 68 deletions

View File

@ -0,0 +1,245 @@
package dorkbox.network;
import org.slf4j.Logger;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.EndPointBase;
import dorkbox.network.dns.serverHandlers.DnsServerHandler;
import dorkbox.util.NamedThreadFactory;
import dorkbox.util.OS;
import dorkbox.util.Property;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
/**
*
*/
public
class DnsServer extends EndPoint {
/**
* The maximum queue length for incoming connection indications (a request to connect). If a connection indication arrives when the
* queue is full, the connection is refused.
*/
@Property
public static int backlogConnectionCount = 50;
private final ServerBootstrap tcpBootstrap;
private final Bootstrap udpBootstrap;
private final int tcpPort;
private final int udpPort;
private final String hostName;
public
DnsServer(String host, int port) {
super(DnsServer.class);
tcpPort = port;
udpPort = port;
if (host == null) {
hostName = "0.0.0.0";
}
else {
hostName = host;
}
String threadName = DnsServer.class.getSimpleName();
final EventLoopGroup boss;
final EventLoopGroup worker;
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
boss = new OioEventLoopGroup(0, new NamedThreadFactory(threadName + "-boss", threadGroup));
worker = new OioEventLoopGroup(0, new NamedThreadFactory(threadName, threadGroup));
}
else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux)
boss = new EpollEventLoopGroup(EndPointBase.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
worker = new EpollEventLoopGroup(EndPointBase.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
}
else {
boss = new NioEventLoopGroup(EndPointBase.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
worker = new NioEventLoopGroup(EndPointBase.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
}
manageForShutdown(boss);
manageForShutdown(worker);
tcpBootstrap = new ServerBootstrap();
udpBootstrap = new Bootstrap();
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
tcpBootstrap.channel(OioServerSocketChannel.class);
}
else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux)
tcpBootstrap.channel(EpollServerSocketChannel.class);
}
else {
tcpBootstrap.channel(NioServerSocketChannel.class);
}
// TODO: If we use netty for an HTTP server,
// Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server.
tcpBootstrap.group(boss, worker)
.option(ChannelOption.SO_BACKLOG, backlogConnectionCount)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(EndPointBase.WRITE_BUFF_LOW, EndPointBase.WRITE_BUFF_HIGH))
.childHandler(new DnsServerHandler());
// have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
if (hostName != null) {
tcpBootstrap.localAddress(hostName, tcpPort);
}
else {
tcpBootstrap.localAddress(tcpPort);
}
// android screws up on this!!
tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
.childOption(ChannelOption.TCP_NODELAY, !OS.isAndroid());
if (OS.isAndroid()) {
// android ONLY supports OIO (not NIO)
udpBootstrap.channel(OioDatagramChannel.class);
}
else if (OS.isLinux()) {
// JNI network stack is MUCH faster (but only on linux)
udpBootstrap.channel(EpollDatagramChannel.class)
.option(EpollChannelOption.SO_REUSEPORT, true);
}
else {
udpBootstrap.channel(NioDatagramChannel.class);
}
udpBootstrap.group(worker)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(EndPointBase.WRITE_BUFF_LOW, EndPointBase.WRITE_BUFF_HIGH))
// not binding to specific address, since it's driven by TCP, and that can be bound to a specific address
.localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets!
.handler(new DnsServerHandler());
}
/**
* Binds the server to the configured, underlying protocols.
* <p/>
* This method will also BLOCK until the stop method is called, and if you want to continue running code after this method invocation,
* bind should be called in a separate, non-daemon thread.
*/
public
void bind() {
bind(true);
}
/**
* Binds the server to the configured, underlying protocols.
* <p/>
* This is a more advanced method, and you should consider calling <code>bind()</code> instead.
*
* @param blockUntilTerminate will BLOCK until the server stop method is called, and if you want to continue running code after this method
* invocation, bind should be called in a separate, non-daemon thread - or with false as the parameter.
*/
@SuppressWarnings("AutoBoxing")
public
void bind(boolean blockUntilTerminate) {
// make sure we are not trying to connect during a close or stop event.
// This will wait until we have finished starting up/shutting down.
synchronized (shutdownInProgress) {
}
// The bootstraps will be accessed ONE AT A TIME, in this order!
ChannelFuture future;
Logger logger2 = logger;
// TCP
// Wait until the connection attempt succeeds or fails.
// try {
// future = tcpBootstrap.bind();
// future.await();
// } catch (Exception e) {
// // String errorMessage = stopWithErrorMessage(logger2,
// // "Could not bind to address " + hostName + " TCP port " + tcpPort +
// // " on the server.",
// // e);
// // throw new IllegalArgumentException(errorMessage);
// throw new RuntimeException();
// }
//
// if (!future.isSuccess()) {
// // String errorMessage = stopWithErrorMessage(logger2,
// // "Could not bind to address " + hostName + " TCP port " + tcpPort +
// // " on the server.",
// // future.cause());
// // throw new IllegalArgumentException(errorMessage);
// throw new RuntimeException();
// }
//
// // logger2.info("Listening on address {} at TCP port: {}", hostName, tcpPort);
//
// manageForShutdown(future);
// UDP
// Wait until the connection attempt succeeds or fails.
try {
future = udpBootstrap.bind();
future.await();
} catch (Exception e) {
String errorMessage = stopWithErrorMessage(logger2,
"Could not bind to address " + hostName + " UDP port " + udpPort +
" on the server.",
e);
throw new IllegalArgumentException(errorMessage);
}
if (!future.isSuccess()) {
String errorMessage = stopWithErrorMessage(logger2,
"Could not bind to address " + hostName + " UDP port " + udpPort +
" on the server.",
future.cause());
throw new IllegalArgumentException(errorMessage);
}
// logger2.info("Listening on address {} at UDP port: {}", hostName, udpPort);
manageForShutdown(future);
// we now BLOCK until the stop method is called.
// if we want to continue running code in the server, bind should be called in a separate, non-daemon thread.
if (blockUntilTerminate) {
waitForShutdown();
}
}
}

View File

@ -1,17 +0,0 @@
package dorkbox.network.dns.handlers;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.DatagramChannel;
public
class DnsHandler extends ChannelInitializer<DatagramChannel> {
@Override
protected
void initChannel(DatagramChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DnsMessageEncoder());
pipeline.addLast(new DnsMessageDecoder());
// pipeline.addLast(new DNSMessageDecoder());
}
}

View File

@ -0,0 +1,180 @@
package dorkbox.network.dns.serverHandlers;
import java.net.InetSocketAddress;
import dorkbox.network.dns.records.DnsMessage;
import dorkbox.util.NamedThreadFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.util.internal.PlatformDependent;
@ChannelHandler.Sharable
public
class DNSMessageDecoder extends ChannelInboundHandlerAdapter {
/**
* This is what is called whenever a DNS packet is received. Currently only support UDP packets.
*
* Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
* <p>
* Sub-classes may override this method to change behavior.
*/
@Override
public
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof DatagramPacket) {
ByteBuf content = ((DatagramPacket) msg).content();
if (content.readableBytes() == 0) {
// we can't read this message, there's nothing there!
System.err.println("NO CONTENT ");
ctx.fireChannelRead(msg);
return;
}
DnsMessage msg1 = new DnsMessage(content);
// should get one from a pool!
Bootstrap dnsBootstrap = new Bootstrap();
// setup the thread group to easily ID what the following threads belong to (and their spawned threads...)
SecurityManager s = System.getSecurityManager();
ThreadGroup nettyGroup = new ThreadGroup(s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(),
"DnsClient (Netty)");
EventLoopGroup group;
if (PlatformDependent.isAndroid()) {
group = new OioEventLoopGroup(0, new NamedThreadFactory("DnsClient-boss-UDP", nettyGroup));
dnsBootstrap.channel(OioDatagramChannel.class);
}
else {
group = new NioEventLoopGroup(2, new NamedThreadFactory("DnsClient-boss-UDP", nettyGroup));
dnsBootstrap.channel(NioDatagramChannel.class);
}
dnsBootstrap.group(group);
dnsBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// dnsBootstrap.handler(new DnsHandler());
// sending the question
final ChannelFuture future = dnsBootstrap.connect(new InetSocketAddress("8.8.8.8", 53));
try {
future.await();
if (future.isSuccess()) {
// woo, connected!
System.err.println("CONNECTED");
// this.dnsServer = dnsServer;
}
else {
System.err.println("CANNOT CONNECT!");
// this.dnsServer = null;
// Logger logger2 = this.logger;
// if (logger2.isDebugEnabled()) {
// logger2.error("Could not connect to the DNS server.", this.future.cause());
// }
// else {
// logger2.error("Could not connect to the DNS server.");
// }
}
} catch (Exception e) {
e.printStackTrace();
// Logger logger2 = this.logger;
// if (logger2.isDebugEnabled()) {
// logger2.error("Could not connect to the DNS server on port {}.", dnsServer.getPort(), e.getCause());
// }
// else {
// logger2.error("Could not connect to the DNS server on port {}.", dnsServer.getPort());
// }
}
//
// ClientBootstrap cb = new ClientBootstrap(this.clientChannelFactory);
// cb.setOption("broadcast", "false");
//
// cb.setPipelineFactory(new ChannelPipelineFactory() {
// @Override
// public
// ChannelPipeline getPipeline() throws Exception {
// return Channels.pipeline(new ClientHanler(original, e.getChannel(), e.getRemoteAddress()));
// }
// });
//
// List<SocketAddress> newlist = new ArrayList<SocketAddress>(this.config.getForwarders());
// sendRequest(e, original, cb, newlist);
}
else {
ctx.fireChannelRead(msg);
}
}
// protected
// void sendRequest(final MessageEvent e, final DNSMessage original, final ClientBootstrap bootstrap, final List<SocketAddress> forwarders) {
//
// if (0 < forwarders.size()) {
// SocketAddress sa = forwarders.remove(0);
// LOG.debug("send to {}", sa);
//
// ChannelFuture f = bootstrap.connect(sa);
// ChannelBuffer newone = ChannelBuffers.buffer(512);
// DNSMessage msg = new DNSMessage(original);
// msg.write(newone);
// newone.resetReaderIndex();
// final Channel c = f.getChannel();
//
// if (LOG.isDebugEnabled()) {
// LOG.debug("STATUS : [isOpen/isConnected/isWritable {}] {} {}",
// new Object[] {new boolean[] {c.isOpen(), c.isConnected(), c.isWritable()}, c.getRemoteAddress(), c.getClass()});
// }
//
// c.write(newone, sa).addListener(new ChannelFutureListener() {
// @Override
// public
// void operationComplete(ChannelFuture future) throws Exception {
// LOG.debug("request complete isSuccess : {}", future.isSuccess());
// if (future.isSuccess() == false) {
// if (0 < forwarders.size()) {
// sendRequest(e, original, bootstrap, forwarders);
// }
// else {
// original.header().rcode(RCode.ServFail);
// ChannelBuffer buffer = ChannelBuffers.buffer(512);
// original.write(buffer);
// // close inbound channel
// e.getChannel().write(buffer).addListener(ChannelFutureListener.CLOSE);
// }
// }
// }
// });
//
// // f.awaitUninterruptibly(30, TimeUnit.SECONDS);
// }
// }
}

View File

@ -1,9 +1,8 @@
package dorkbox.network.dns.handlers;
package dorkbox.network.dns.serverHandlers;
import java.util.List;
import org.handwerkszeug.dns.DNSMessage;
import dorkbox.network.dns.records.DnsMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
@ -34,7 +33,8 @@ class DnsMessageDecoder extends MessageToMessageDecoder<DatagramPacket> {
boolean success = false;
try {
DNSMessage dnsMessage = new DNSMessage(buf);
DnsMessage dnsMessage = new DnsMessage(buf);
dnsMessage.retain();
out.add(dnsMessage);
success = true;
} finally {
@ -42,5 +42,5 @@ class DnsMessageDecoder extends MessageToMessageDecoder<DatagramPacket> {
buf.release();
}
}
}
}
}

View File

@ -1,4 +1,4 @@
package dorkbox.network.dns.handlers;
package dorkbox.network.dns.serverHandlers;
import dorkbox.network.dns.DnsOutput;
import dorkbox.network.dns.records.DnsMessage;

View File

@ -0,0 +1,41 @@
package dorkbox.network.dns.serverHandlers;
import dorkbox.network.dns.DnsResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
public class DnsResponseHandler extends SimpleChannelInboundHandler<DnsResponse> {
private Promise<Object> promise;
public DnsResponseHandler(Promise<Object> promise) {
this.promise = promise;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DnsResponse msg) throws Exception {
System.err.println("YOP! " + msg);
// DnsResponseCode errorCode = msg.header().responseCode();
//
// if (errorCode == DnsResponseCode.NOERROR) {
// RecordDecoderFactory factory = RecordDecoderFactory.getFactory();
//
// List<DnsResource> resources = msg.answers();
// List<Object> records = new ArrayList<>(resources.size());
// for (DnsResource resource : resources) {
// Object record = factory.decode(msg, resource);
// records.add(record);
// }
// this.promise.setSuccess(records);
// } else {
// this.promise.setFailure(new DnsException(errorCode));
// }
// ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
this.promise.setFailure(cause);
ctx.close();
}
}

View File

@ -1,6 +1,7 @@
package dorkbox.network.dns.handlers;
package dorkbox.network.dns.serverHandlers;
import org.handwerkszeug.dns.server.DNSMessageDecoder;
import org.slf4j.Logger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -12,23 +13,24 @@ import io.netty.channel.ChannelPipeline;
*/
public
class DnsServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(DnsServerHandler.class);
protected DNSMessageDecoder decoder = new DNSMessageDecoder();
protected final DnsMessageDecoder decoder = new DnsMessageDecoder();
public
DnsServerHandler(final String threadName) {
DnsServerHandler() {
}
@Override
public final
void channelRegistered(final ChannelHandlerContext context) throws Exception {
void channelRegistered(final ChannelHandlerContext context) {
boolean success = false;
try {
initChannel(context.channel());
context.fireChannelRegistered();
success = true;
} catch (Throwable t) {
// this.logger.error("Failed to initialize a channel. Closing: {}", context.channel(), t);
LOG.error("Failed to initialize a channel. Closing: {}", context.channel(), t);
} finally {
if (!success) {
context.close();

View File

@ -1,4 +1,4 @@
package dorkbox.network.dns.handlers;
package dorkbox.network.dns.serverHandlers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,6 +24,7 @@ public class ForwardingHandler extends ChannelOutboundHandlerAdapter {
@Override
public
void read(final ChannelHandlerContext ctx) throws Exception {
System.err.println("FORWARD HANDLER READ");
super.read(ctx);
}
@ -103,7 +104,7 @@ public class ForwardingHandler extends ChannelOutboundHandlerAdapter {
}
protected class ClientHanler extends ChannelInboundHandlerAdapter {
protected class ClientHandler extends ChannelInboundHandlerAdapter {
// protected DNSMessage original;
//

View File

@ -0,0 +1,25 @@
package dorkbox.network.dns.serverHandlers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.Promise;
public class SuccessHandler implements ChannelFutureListener {
private Promise<Object> promise;
public SuccessHandler(Promise<Object> promise) {
this.promise = promise;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.err.println("SUCCESS COMPLETE");
if (future.isSuccess()) {
future.channel().pipeline().addLast(new DnsResponseHandler(this.promise));
} else {
if (!future.isDone()) {
this.promise.setFailure(future.cause());
}
}
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.resolver.dns;
import io.netty.buffer.ByteBuf;
import io.netty.resolver.ResolvedAddressTypes;
public final
class DnsNameResolverAccess {
public static
String decodeDomainName(final ByteBuf byteBuff) {
return DnsNameResolverContext.decodeDomainName(byteBuff);
}
public static
ResolvedAddressTypes getDefaultResolvedAddressTypes() {
return DnsNameResolver.DEFAULT_RESOLVE_ADDRESS_TYPES;
}
private
DnsNameResolverAccess() {
}
}