Added a more general ability for datagram sessions -- it's based on the
bootstrap implementation instead of a custom "Server" channel.
This commit is contained in:
parent
b0799c6846
commit
d021340878
@ -29,13 +29,28 @@ import dorkbox.util.OS;
|
|||||||
import dorkbox.util.Property;
|
import dorkbox.util.Property;
|
||||||
import dorkbox.util.exceptions.SecurityException;
|
import dorkbox.util.exceptions.SecurityException;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.bootstrap.SessionBootstrap;
|
||||||
import io.netty.buffer.PooledByteBufAllocator;
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
import io.netty.channel.*;
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.DefaultEventLoopGroup;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||||
|
import io.netty.channel.WriteBufferWaterMark;
|
||||||
|
import io.netty.channel.epoll.EpollDatagramChannel;
|
||||||
|
import io.netty.channel.epoll.EpollEventLoopGroup;
|
||||||
|
import io.netty.channel.epoll.EpollServerSocketChannel;
|
||||||
|
import io.netty.channel.kqueue.KQueueDatagramChannel;
|
||||||
|
import io.netty.channel.kqueue.KQueueEventLoopGroup;
|
||||||
|
import io.netty.channel.kqueue.KQueueServerSocketChannel;
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
import io.netty.channel.local.LocalServerChannel;
|
import io.netty.channel.local.LocalServerChannel;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.nio.NioServerDatagramChannel;
|
import io.netty.channel.oio.OioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.nio.NioDatagramChannel;
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
|
import io.netty.channel.socket.oio.OioDatagramChannel;
|
||||||
|
import io.netty.channel.socket.oio.OioServerSocketChannel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
|
* The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
|
||||||
@ -64,7 +79,7 @@ class Server<C extends Connection> extends EndPointServer {
|
|||||||
|
|
||||||
private final ServerBootstrap localBootstrap;
|
private final ServerBootstrap localBootstrap;
|
||||||
private final ServerBootstrap tcpBootstrap;
|
private final ServerBootstrap tcpBootstrap;
|
||||||
private final ServerBootstrap udpBootstrap;
|
private final SessionBootstrap udpBootstrap;
|
||||||
|
|
||||||
private final int tcpPort;
|
private final int tcpPort;
|
||||||
private final int udpPort;
|
private final int udpPort;
|
||||||
@ -125,7 +140,7 @@ class Server<C extends Connection> extends EndPointServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (udpPort > 0) {
|
if (udpPort > 0) {
|
||||||
udpBootstrap = new ServerBootstrap();
|
udpBootstrap = new SessionBootstrap();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
udpBootstrap = null;
|
udpBootstrap = null;
|
||||||
@ -138,25 +153,25 @@ class Server<C extends Connection> extends EndPointServer {
|
|||||||
final EventLoopGroup boss;
|
final EventLoopGroup boss;
|
||||||
final EventLoopGroup worker;
|
final EventLoopGroup worker;
|
||||||
|
|
||||||
// if (OS.isAndroid()) {
|
if (OS.isAndroid()) {
|
||||||
// // android ONLY supports OIO (not NIO)
|
// android ONLY supports OIO (not NIO)
|
||||||
// boss = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
boss = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||||
// worker = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
worker = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||||
// }
|
}
|
||||||
// else if (OS.isLinux() && NativeLibrary.isAvailable()) {
|
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
|
||||||
// // JNI network stack is MUCH faster (but only on linux)
|
// JNI network stack is MUCH faster (but only on linux)
|
||||||
// boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||||
// worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||||
// }
|
}
|
||||||
// else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
|
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
|
||||||
// // KQueue network stack is MUCH faster (but only on macosx)
|
// KQueue network stack is MUCH faster (but only on macosx)
|
||||||
// boss = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
boss = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||||
// worker = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
worker = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||||
// }
|
}
|
||||||
// else {
|
else {
|
||||||
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup));
|
||||||
worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup));
|
||||||
// }
|
}
|
||||||
|
|
||||||
manageForShutdown(boss);
|
manageForShutdown(boss);
|
||||||
manageForShutdown(worker);
|
manageForShutdown(worker);
|
||||||
@ -185,21 +200,21 @@ class Server<C extends Connection> extends EndPointServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (tcpBootstrap != null) {
|
if (tcpBootstrap != null) {
|
||||||
// if (OS.isAndroid()) {
|
if (OS.isAndroid()) {
|
||||||
// // android ONLY supports OIO (not NIO)
|
// android ONLY supports OIO (not NIO)
|
||||||
// tcpBootstrap.channel(OioServerSocketChannel.class);
|
tcpBootstrap.channel(OioServerSocketChannel.class);
|
||||||
// }
|
}
|
||||||
// else if (OS.isLinux() && NativeLibrary.isAvailable()) {
|
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
|
||||||
// // JNI network stack is MUCH faster (but only on linux)
|
// JNI network stack is MUCH faster (but only on linux)
|
||||||
// tcpBootstrap.channel(EpollServerSocketChannel.class);
|
tcpBootstrap.channel(EpollServerSocketChannel.class);
|
||||||
// }
|
}
|
||||||
// else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
|
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
|
||||||
// // KQueue network stack is MUCH faster (but only on macosx)
|
// KQueue network stack is MUCH faster (but only on macosx)
|
||||||
// tcpBootstrap.channel(KQueueServerSocketChannel.class);
|
tcpBootstrap.channel(KQueueServerSocketChannel.class);
|
||||||
// }
|
}
|
||||||
// else {
|
else {
|
||||||
tcpBootstrap.channel(NioServerSocketChannel.class);
|
tcpBootstrap.channel(NioServerSocketChannel.class);
|
||||||
// }
|
}
|
||||||
|
|
||||||
// TODO: If we use netty for an HTTP server,
|
// TODO: If we use netty for an HTTP server,
|
||||||
// Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server.
|
// Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server.
|
||||||
@ -230,23 +245,22 @@ class Server<C extends Connection> extends EndPointServer {
|
|||||||
|
|
||||||
|
|
||||||
if (udpBootstrap != null) {
|
if (udpBootstrap != null) {
|
||||||
// if (OS.isAndroid()) {
|
if (OS.isAndroid()) {
|
||||||
// // android ONLY supports OIO (not NIO)
|
// android ONLY supports OIO (not NIO)
|
||||||
// udpBootstrap.channel(OioDatagramChannel.class);
|
udpBootstrap.channel(OioDatagramChannel.class);
|
||||||
// }
|
}
|
||||||
// else if (OS.isLinux() && NativeLibrary.isAvailable()) {
|
else if (OS.isLinux() && NativeLibrary.isAvailable()) {
|
||||||
// // JNI network stack is MUCH faster (but only on linux)
|
// JNI network stack is MUCH faster (but only on linux)
|
||||||
// udpBootstrap.channel(EpollDatagramChannel.class);
|
udpBootstrap.channel(EpollDatagramChannel.class);
|
||||||
// }
|
}
|
||||||
// else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
|
else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
|
||||||
// // KQueue network stack is MUCH faster (but only on macosx)
|
// KQueue network stack is MUCH faster (but only on macosx)
|
||||||
// udpBootstrap.channel(KQueueDatagramChannel.class);
|
udpBootstrap.channel(KQueueDatagramChannel.class);
|
||||||
// }
|
}
|
||||||
// else {
|
else {
|
||||||
// windows and linux/mac that are incompatible with the native implementations
|
// windows and linux/mac that are incompatible with the native implementations
|
||||||
// udpBootstrap.channel(NioDatagramChannel.class);
|
udpBootstrap.channel(NioDatagramChannel.class);
|
||||||
// }
|
}
|
||||||
udpBootstrap.channel(NioServerDatagramChannel.class);
|
|
||||||
|
|
||||||
|
|
||||||
// Netty4 has a default of 2048 bytes as upper limit for datagram packets, we want this to be whatever we specify
|
// Netty4 has a default of 2048 bytes as upper limit for datagram packets, we want this to be whatever we specify
|
||||||
@ -260,11 +274,7 @@ class Server<C extends Connection> extends EndPointServer {
|
|||||||
// TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP
|
// TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP
|
||||||
// OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets)
|
// OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets)
|
||||||
.localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722
|
.localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722
|
||||||
|
.childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper));
|
||||||
.childHandler(new RegistrationRemoteHandlerServerUDP(threadName,
|
|
||||||
registrationWrapper));
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
|
// // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
|
||||||
// if (hostName.equals("0.0.0.0")) {
|
// if (hostName.equals("0.0.0.0")) {
|
||||||
|
@ -44,7 +44,7 @@ class KryoDecoderUdp extends MessageToMessageDecoder<Object> {
|
|||||||
@Override
|
@Override
|
||||||
public
|
public
|
||||||
boolean acceptInboundMessage(final Object msg) throws Exception {
|
boolean acceptInboundMessage(final Object msg) throws Exception {
|
||||||
return msg instanceof ByteBuf || msg instanceof AddressedEnvelope;
|
return msg instanceof AddressedEnvelope;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -74,15 +74,7 @@ class KryoDecoderUdp extends MessageToMessageDecoder<Object> {
|
|||||||
@Override
|
@Override
|
||||||
protected
|
protected
|
||||||
void decode(ChannelHandlerContext context, Object message, List<Object> out) throws Exception {
|
void decode(ChannelHandlerContext context, Object message, List<Object> out) throws Exception {
|
||||||
ByteBuf data;
|
ByteBuf data = (ByteBuf) ((AddressedEnvelope) message).content();
|
||||||
if (message instanceof AddressedEnvelope) {
|
|
||||||
// this is on the client
|
|
||||||
data = (ByteBuf) ((AddressedEnvelope) message).content();
|
|
||||||
} else {
|
|
||||||
// this is on the server
|
|
||||||
data = (ByteBuf) message;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (data != null) {
|
if (data != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -13,15 +13,18 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package io.netty.channel.socket.nio;
|
package io.netty.bootstrap;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.channel.AbstractChannel;
|
||||||
import io.netty.channel.*;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
|
import io.netty.channel.ChannelConfig;
|
||||||
import io.netty.channel.nio.NioEventLoop;
|
import io.netty.channel.ChannelMetadata;
|
||||||
|
import io.netty.channel.ChannelOutboundBuffer;
|
||||||
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.socket.DatagramPacket;
|
import io.netty.channel.socket.DatagramPacket;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.internal.RecyclableArrayList;
|
import io.netty.util.internal.RecyclableArrayList;
|
||||||
@ -41,22 +44,27 @@ class DatagramSessionChannel extends AbstractChannel implements Channel {
|
|||||||
|
|
||||||
|
|
||||||
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
|
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
|
||||||
protected final DatagramSessionChannelConfig config;
|
private final DatagramSessionChannelConfig config;
|
||||||
|
|
||||||
|
|
||||||
protected final NioServerDatagramChannel serverChannel;
|
private SessionManager sessionManager;
|
||||||
protected final InetSocketAddress remote;
|
private InetSocketAddress localAddress;
|
||||||
|
private InetSocketAddress remoteAddress;
|
||||||
|
|
||||||
private volatile boolean isOpen = true;
|
private volatile boolean isOpen = true;
|
||||||
private ByteBuf buffer;
|
|
||||||
|
|
||||||
protected
|
DatagramSessionChannel(final Channel parentChannel,
|
||||||
DatagramSessionChannel(NioServerDatagramChannel serverChannel, InetSocketAddress remote) {
|
final SessionManager sessionManager,
|
||||||
super(serverChannel);
|
final DatagramSessionChannelConfig sessionConfig,
|
||||||
this.serverChannel = serverChannel;
|
final InetSocketAddress localAddress,
|
||||||
this.remote = remote;
|
final InetSocketAddress remoteAddress) {
|
||||||
|
super(parentChannel);
|
||||||
|
|
||||||
config = new DatagramSessionChannelConfig(this, serverChannel);
|
this.sessionManager = sessionManager;
|
||||||
|
this.config = sessionConfig;
|
||||||
|
|
||||||
|
this.localAddress = localAddress;
|
||||||
|
this.remoteAddress = remoteAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -68,12 +76,6 @@ class DatagramSessionChannel extends AbstractChannel implements Channel {
|
|||||||
@Override
|
@Override
|
||||||
protected
|
protected
|
||||||
void doBeginRead() throws Exception {
|
void doBeginRead() throws Exception {
|
||||||
// a single packet is 100% of our data, so we cannot have multiple reads (there is no "session" for UDP)
|
|
||||||
ChannelPipeline pipeline = pipeline();
|
|
||||||
|
|
||||||
pipeline.fireChannelRead(buffer);
|
|
||||||
pipeline.fireChannelReadComplete();
|
|
||||||
buffer = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -86,7 +88,7 @@ class DatagramSessionChannel extends AbstractChannel implements Channel {
|
|||||||
protected
|
protected
|
||||||
void doClose() throws Exception {
|
void doClose() throws Exception {
|
||||||
isOpen = false;
|
isOpen = false;
|
||||||
serverChannel.doCloseChannel(this);
|
sessionManager.doCloseChannel(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -119,7 +121,7 @@ class DatagramSessionChannel extends AbstractChannel implements Channel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//schedule a task that will write those entries
|
//schedule a task that will write those entries
|
||||||
NioEventLoop eventLoop = serverChannel.eventLoop();
|
EventLoop eventLoop = parent().eventLoop();
|
||||||
if (eventLoop.inEventLoop()) {
|
if (eventLoop.inEventLoop()) {
|
||||||
write0(list);
|
write0(list);
|
||||||
}
|
}
|
||||||
@ -143,7 +145,8 @@ class DatagramSessionChannel extends AbstractChannel implements Channel {
|
|||||||
@Override
|
@Override
|
||||||
protected
|
protected
|
||||||
boolean isCompatible(EventLoop eventloop) {
|
boolean isCompatible(EventLoop eventloop) {
|
||||||
return eventloop instanceof NioEventLoop;
|
// compatible with all Datagram event loops where we are explicitly used
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -161,7 +164,7 @@ class DatagramSessionChannel extends AbstractChannel implements Channel {
|
|||||||
@Override
|
@Override
|
||||||
protected
|
protected
|
||||||
SocketAddress localAddress0() {
|
SocketAddress localAddress0() {
|
||||||
return serverChannel.localAddress0();
|
return localAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -180,24 +183,19 @@ class DatagramSessionChannel extends AbstractChannel implements Channel {
|
|||||||
@Override
|
@Override
|
||||||
public
|
public
|
||||||
InetSocketAddress remoteAddress() {
|
InetSocketAddress remoteAddress() {
|
||||||
return remote;
|
return remoteAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected
|
protected
|
||||||
InetSocketAddress remoteAddress0() {
|
InetSocketAddress remoteAddress0() {
|
||||||
return remote;
|
return remoteAddress;
|
||||||
}
|
|
||||||
|
|
||||||
public
|
|
||||||
void setBuffer(final ByteBuf buffer) {
|
|
||||||
this.buffer = buffer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private
|
private
|
||||||
void write0(final RecyclableArrayList list) {
|
void write0(final RecyclableArrayList list) {
|
||||||
try {
|
try {
|
||||||
NioUnsafe unsafe = serverChannel.unsafe();
|
Unsafe unsafe = super.parent().unsafe();
|
||||||
|
|
||||||
for (Object buf : list) {
|
for (Object buf : list) {
|
||||||
unsafe.write(buf, voidPromise());
|
unsafe.write(buf, voidPromise());
|
@ -13,13 +13,19 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package io.netty.channel.socket.nio;
|
package io.netty.bootstrap;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import dorkbox.network.connection.EndPoint;
|
import dorkbox.network.connection.EndPoint;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.*;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelConfig;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.DefaultMessageSizeEstimator;
|
||||||
|
import io.netty.channel.MessageSizeEstimator;
|
||||||
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
|
import io.netty.channel.WriteBufferWaterMark;
|
||||||
import io.netty.channel.socket.DatagramChannelConfig;
|
import io.netty.channel.socket.DatagramChannelConfig;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -31,14 +37,14 @@ public class DatagramSessionChannelConfig implements ChannelConfig {
|
|||||||
|
|
||||||
private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
|
private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
|
||||||
|
|
||||||
private final NioServerDatagramChannel serverDatagramSessionChannel;
|
private final Channel channel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance.
|
* Creates a new instance.
|
||||||
*/
|
*/
|
||||||
public
|
public
|
||||||
DatagramSessionChannelConfig(DatagramSessionChannel channel, final NioServerDatagramChannel serverDatagramSessionChannel) {
|
DatagramSessionChannelConfig(Channel channel) {
|
||||||
this.serverDatagramSessionChannel = serverDatagramSessionChannel;
|
this.channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -56,7 +62,7 @@ public class DatagramSessionChannelConfig implements ChannelConfig {
|
|||||||
@Override
|
@Override
|
||||||
public
|
public
|
||||||
<T> T getOption(final ChannelOption<T> option) {
|
<T> T getOption(final ChannelOption<T> option) {
|
||||||
return serverDatagramSessionChannel.config().getOption(option);
|
return channel.config().getOption(option);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -104,8 +110,7 @@ public class DatagramSessionChannelConfig implements ChannelConfig {
|
|||||||
@Override
|
@Override
|
||||||
public
|
public
|
||||||
ByteBufAllocator getAllocator() {
|
ByteBufAllocator getAllocator() {
|
||||||
return serverDatagramSessionChannel.config()
|
return channel.config().getAllocator();
|
||||||
.getAllocator();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -117,8 +122,7 @@ public class DatagramSessionChannelConfig implements ChannelConfig {
|
|||||||
@Override
|
@Override
|
||||||
public
|
public
|
||||||
<T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
|
<T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
|
||||||
return serverDatagramSessionChannel.config()
|
return channel.config().getRecvByteBufAllocator();
|
||||||
.getRecvByteBufAllocator();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
289
src/io/netty/bootstrap/SessionBootstrap.java
Normal file
289
src/io/netty/bootstrap/SessionBootstrap.java
Normal file
@ -0,0 +1,289 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2018 dorkbox, llc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.bootstrap;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.resolver.AddressResolverGroup;
|
||||||
|
import io.netty.resolver.DefaultAddressResolverGroup;
|
||||||
|
import io.netty.util.AttributeKey;
|
||||||
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VERY similar to the ServerBootstrap class, with the change to having a "SessionManager" added to the pipeline on init (instead of
|
||||||
|
* the ServerBootstrapAcceptor getting added)
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
class SessionBootstrap extends AbstractBootstrap<SessionBootstrap, Channel> {
|
||||||
|
private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
|
||||||
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(SessionBootstrap.class);
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static
|
||||||
|
Entry<AttributeKey<?>, Object>[] newAttrArray(int size) {
|
||||||
|
return new Entry[size];
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static
|
||||||
|
Map.Entry<ChannelOption<?>, Object>[] newOptionArray(int size) {
|
||||||
|
return new Map.Entry[size];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
|
||||||
|
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
|
||||||
|
private final SessionBootstrapConfig config = new SessionBootstrapConfig(this);
|
||||||
|
private volatile EventLoopGroup childGroup;
|
||||||
|
private volatile ChannelHandler childHandler;
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private volatile AddressResolverGroup<SocketAddress> resolver = (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER;
|
||||||
|
|
||||||
|
public
|
||||||
|
SessionBootstrap() { }
|
||||||
|
|
||||||
|
private
|
||||||
|
SessionBootstrap(SessionBootstrap bootstrap) {
|
||||||
|
super(bootstrap);
|
||||||
|
|
||||||
|
resolver = bootstrap.resolver;
|
||||||
|
|
||||||
|
childGroup = bootstrap.childGroup;
|
||||||
|
childHandler = bootstrap.childHandler;
|
||||||
|
|
||||||
|
synchronized (bootstrap.childOptions) {
|
||||||
|
childOptions.putAll(bootstrap.childOptions);
|
||||||
|
}
|
||||||
|
synchronized (bootstrap.childAttrs) {
|
||||||
|
childAttrs.putAll(bootstrap.childAttrs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the specific {@link AttributeKey} with the given value on every child {@link Channel}. If the value is
|
||||||
|
* {@code null} the {@link AttributeKey} is removed
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
<T> SessionBootstrap childAttr(AttributeKey<T> childKey, T value) {
|
||||||
|
if (childKey == null) {
|
||||||
|
throw new NullPointerException("childKey");
|
||||||
|
}
|
||||||
|
if (value == null) {
|
||||||
|
childAttrs.remove(childKey);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
childAttrs.put(childKey, value);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
final
|
||||||
|
Map<AttributeKey<?>, Object> childAttrs() {
|
||||||
|
return copiedMap(childAttrs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the configured {@link EventLoopGroup} which will be used for the child channels or {@code null}
|
||||||
|
* if non is configured yet.
|
||||||
|
*
|
||||||
|
* @deprecated Use {@link #config()} instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public
|
||||||
|
EventLoopGroup childGroup() {
|
||||||
|
return childGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
SessionBootstrap childHandler(ChannelHandler childHandler) {
|
||||||
|
if (childHandler == null) {
|
||||||
|
throw new NullPointerException("childHandler");
|
||||||
|
}
|
||||||
|
this.childHandler = childHandler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
final
|
||||||
|
ChannelHandler childHandler() {
|
||||||
|
return childHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
|
||||||
|
* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
|
||||||
|
* {@link ChannelOption}.
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
<T> SessionBootstrap childOption(ChannelOption<T> childOption, T value) {
|
||||||
|
if (childOption == null) {
|
||||||
|
throw new NullPointerException("childOption");
|
||||||
|
}
|
||||||
|
if (value == null) {
|
||||||
|
synchronized (childOptions) {
|
||||||
|
childOptions.remove(childOption);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
synchronized (childOptions) {
|
||||||
|
childOptions.put(childOption, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
final
|
||||||
|
Map<ChannelOption<?>, Object> childOptions() {
|
||||||
|
return copiedMap(childOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("CloneDoesntCallSuperClone")
|
||||||
|
public
|
||||||
|
SessionBootstrap clone() {
|
||||||
|
return new SessionBootstrap(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final
|
||||||
|
SessionBootstrapConfig config() {
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
SessionBootstrap group(EventLoopGroup group) {
|
||||||
|
return group(group, group);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
|
||||||
|
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link DatagramSessionChannel} and
|
||||||
|
* {@link Channel}'s.
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
SessionBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
|
||||||
|
super.group(parentGroup);
|
||||||
|
if (childGroup == null) {
|
||||||
|
throw new NullPointerException("childGroup");
|
||||||
|
}
|
||||||
|
if (this.childGroup != null) {
|
||||||
|
throw new IllegalStateException("childGroup set already");
|
||||||
|
}
|
||||||
|
this.childGroup = childGroup;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void init(Channel channel) throws Exception {
|
||||||
|
final Map<ChannelOption<?>, Object> options = options0();
|
||||||
|
synchronized (options) {
|
||||||
|
setChannelOptions(channel, options, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<AttributeKey<?>, Object> attrs = attrs0();
|
||||||
|
synchronized (attrs) {
|
||||||
|
for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
|
||||||
|
channel.attr(key)
|
||||||
|
.set(e.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ChannelPipeline p = channel.pipeline();
|
||||||
|
|
||||||
|
final EventLoopGroup currentChildGroup = childGroup;
|
||||||
|
final ChannelHandler currentChildHandler = childHandler;
|
||||||
|
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
|
||||||
|
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
|
||||||
|
|
||||||
|
|
||||||
|
synchronized (childOptions) {
|
||||||
|
currentChildOptions = childOptions.entrySet()
|
||||||
|
.toArray(newOptionArray(childOptions.size()));
|
||||||
|
}
|
||||||
|
synchronized (childAttrs) {
|
||||||
|
currentChildAttrs = childAttrs.entrySet()
|
||||||
|
.toArray(newAttrArray(childAttrs.size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
p.addLast(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
void initChannel(final Channel ch) throws Exception {
|
||||||
|
final ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
ChannelHandler handler = config.handler();
|
||||||
|
if (handler != null) {
|
||||||
|
pipeline.addLast(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
ch.eventLoop()
|
||||||
|
.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
void run() {
|
||||||
|
pipeline.addLast(new SessionManager(ch,
|
||||||
|
currentChildGroup,
|
||||||
|
currentChildHandler,
|
||||||
|
currentChildOptions,
|
||||||
|
currentChildAttrs));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public
|
||||||
|
SocketAddress remoteAddress() {
|
||||||
|
// the nature of this is that WE do not have a remote address, but our child channels DO have a remote address
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public
|
||||||
|
AddressResolverGroup<?> resolver() {
|
||||||
|
return resolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
SessionBootstrap validate() {
|
||||||
|
super.validate();
|
||||||
|
if (childHandler == null) {
|
||||||
|
throw new IllegalStateException("childHandler not set");
|
||||||
|
}
|
||||||
|
if (childGroup == null) {
|
||||||
|
logger.warn("childGroup is not set. Using parentGroup instead.");
|
||||||
|
childGroup = config.group();
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
64
src/io/netty/bootstrap/SessionBootstrapConfig.java
Normal file
64
src/io/netty/bootstrap/SessionBootstrapConfig.java
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2018 dorkbox, llc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.bootstrap;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.resolver.AddressResolverGroup;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exposes the configuration of a {@link Bootstrap}.
|
||||||
|
*/
|
||||||
|
public final
|
||||||
|
class SessionBootstrapConfig extends AbstractBootstrapConfig<SessionBootstrap, Channel> {
|
||||||
|
|
||||||
|
SessionBootstrapConfig(SessionBootstrap bootstrap) {
|
||||||
|
super(bootstrap);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the configured remote address or {@code null} if non is configured yet.
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
SocketAddress remoteAddress() {
|
||||||
|
return bootstrap.remoteAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the configured {@link AddressResolverGroup} or the default if non is configured yet.
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
AddressResolverGroup<?> resolver() {
|
||||||
|
return bootstrap.resolver();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
String toString() {
|
||||||
|
StringBuilder buf = new StringBuilder(super.toString());
|
||||||
|
buf.setLength(buf.length() - 1);
|
||||||
|
buf.append(", resolver: ")
|
||||||
|
.append(resolver());
|
||||||
|
SocketAddress remoteAddress = remoteAddress();
|
||||||
|
if (remoteAddress != null) {
|
||||||
|
buf.append(", remoteAddress: ")
|
||||||
|
.append(remoteAddress);
|
||||||
|
}
|
||||||
|
return buf.append(')')
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
}
|
245
src/io/netty/bootstrap/SessionManager.java
Normal file
245
src/io/netty/bootstrap/SessionManager.java
Normal file
@ -0,0 +1,245 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2018 dorkbox, llc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.bootstrap;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import dorkbox.network.pipeline.discovery.BroadcastServer;
|
||||||
|
import dorkbox.util.bytes.BigEndian.Long_;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelConfig;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.socket.DatagramPacket;
|
||||||
|
import io.netty.util.AttributeKey;
|
||||||
|
import io.netty.util.collection.LongObjectHashMap;
|
||||||
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
class SessionManager extends ChannelInboundHandlerAdapter {
|
||||||
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(SessionBootstrap.class);
|
||||||
|
|
||||||
|
private static
|
||||||
|
void forceClose(DatagramSessionChannel child, Throwable t) throws Exception {
|
||||||
|
child.unsafe()
|
||||||
|
.closeForcibly();
|
||||||
|
|
||||||
|
child.doClose();
|
||||||
|
|
||||||
|
logger.warn("Failed to register an accepted channel: {}", child, t);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static
|
||||||
|
long getChannelId(final InetSocketAddress remoteAddress) {
|
||||||
|
int address = remoteAddress.getAddress()
|
||||||
|
.hashCode(); // we want it as an int
|
||||||
|
|
||||||
|
int port = remoteAddress.getPort(); // this is really just 2 bytes
|
||||||
|
|
||||||
|
byte[] combined = new byte[8];
|
||||||
|
combined[0] = (byte) ((port >>> 24) & 0xFF);
|
||||||
|
combined[1] = (byte) ((port >>> 16) & 0xFF);
|
||||||
|
combined[2] = (byte) ((port >>> 8) & 0xFF);
|
||||||
|
combined[3] = (byte) ((port) & 0xFF);
|
||||||
|
combined[4] = (byte) ((address >>> 24) & 0xFF);
|
||||||
|
combined[5] = (byte) ((address >>> 16) & 0xFF);
|
||||||
|
combined[6] = (byte) ((address >>> 8) & 0xFF);
|
||||||
|
combined[7] = (byte) ((address) & 0xFF);
|
||||||
|
|
||||||
|
return Long_.from(combined);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final BroadcastServer broadcastServer = new BroadcastServer();
|
||||||
|
// Does not need to be thread safe, because access only happens in the event loop
|
||||||
|
private final LongObjectHashMap<DatagramSessionChannel> datagramChannels = new LongObjectHashMap<DatagramSessionChannel>();
|
||||||
|
|
||||||
|
|
||||||
|
private final EventLoopGroup childGroup;
|
||||||
|
private final ChannelHandler childHandler;
|
||||||
|
|
||||||
|
private final Entry<ChannelOption<?>, Object>[] childOptions;
|
||||||
|
private final Entry<AttributeKey<?>, Object>[] childAttrs;
|
||||||
|
|
||||||
|
private final Runnable enableAutoReadTask;
|
||||||
|
private final DatagramSessionChannelConfig sessionConfig;
|
||||||
|
|
||||||
|
|
||||||
|
SessionManager(final Channel channel,
|
||||||
|
EventLoopGroup childGroup,
|
||||||
|
ChannelHandler childHandler,
|
||||||
|
Entry<ChannelOption<?>, Object>[] childOptions,
|
||||||
|
Entry<AttributeKey<?>, Object>[] childAttrs) {
|
||||||
|
|
||||||
|
this.sessionConfig = new DatagramSessionChannelConfig(channel);
|
||||||
|
|
||||||
|
this.childGroup = childGroup;
|
||||||
|
this.childHandler = childHandler;
|
||||||
|
this.childOptions = childOptions;
|
||||||
|
this.childAttrs = childAttrs;
|
||||||
|
|
||||||
|
// Task which is scheduled to re-enable auto-read.
|
||||||
|
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
|
||||||
|
// not be able to load the class because of the file limit it already reached.
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/1328
|
||||||
|
enableAutoReadTask = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
void run() {
|
||||||
|
channel.config()
|
||||||
|
.setAutoRead(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
void channelInactive(final ChannelHandlerContext ctx) throws Exception {
|
||||||
|
super.channelInactive(ctx);
|
||||||
|
|
||||||
|
// have to close all of the "fake" DatagramChannels as well. Each one will remove itself from the channel map.
|
||||||
|
|
||||||
|
// We make a copy of this b/c of concurrent modification, in the event this is closed BEFORE the child-channels are closed
|
||||||
|
ArrayList<DatagramSessionChannel> channels = new ArrayList<DatagramSessionChannel>(datagramChannels.values());
|
||||||
|
for (DatagramSessionChannel datagramSessionChannel : channels) {
|
||||||
|
datagramSessionChannel.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public
|
||||||
|
void channelRead(ChannelHandlerContext context, Object msg) {
|
||||||
|
Channel channel = context.channel();
|
||||||
|
|
||||||
|
DatagramPacket packet = ((DatagramPacket) msg);
|
||||||
|
|
||||||
|
ByteBuf content = packet.content();
|
||||||
|
InetSocketAddress localAddress = packet.recipient();
|
||||||
|
InetSocketAddress remoteAddress = packet.sender();
|
||||||
|
|
||||||
|
// check to see if it's a broadcast packet or not
|
||||||
|
if (broadcastServer.isBroadcast(channel, content, localAddress, remoteAddress)) {
|
||||||
|
// don't bother creating channels if this is a broadcast event. Just respond and be finished
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
long channelId = getChannelId(remoteAddress);
|
||||||
|
|
||||||
|
// create a new channel or reuse existing one
|
||||||
|
DatagramSessionChannel sessionChannel = datagramChannels.get(channelId);
|
||||||
|
ChannelPipeline sessionPipeline;
|
||||||
|
if (sessionChannel == null) {
|
||||||
|
try {
|
||||||
|
sessionChannel = new DatagramSessionChannel(context.channel(), this, sessionConfig, localAddress, remoteAddress);
|
||||||
|
datagramChannels.put(channelId, sessionChannel);
|
||||||
|
|
||||||
|
sessionPipeline = sessionChannel.pipeline();
|
||||||
|
|
||||||
|
// add the child handler to the fake channel
|
||||||
|
sessionPipeline.addLast(childHandler);
|
||||||
|
|
||||||
|
// setup the channel options
|
||||||
|
AbstractBootstrap.setChannelOptions(sessionChannel, childOptions, logger);
|
||||||
|
|
||||||
|
for (Entry<AttributeKey<?>, Object> e : childAttrs) {
|
||||||
|
sessionChannel.attr((AttributeKey<Object>) e.getKey())
|
||||||
|
.set(e.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
final DatagramSessionChannel finalSessionChannel = sessionChannel;
|
||||||
|
childGroup.register(sessionChannel)
|
||||||
|
.addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
forceClose(finalSessionChannel, future.cause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Throwable t) {
|
||||||
|
forceClose(sessionChannel, t);
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("Failed to create a new datagram channel from a read operation.", t);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (sessionChannel != null) {
|
||||||
|
sessionChannel.close();
|
||||||
|
}
|
||||||
|
} catch (Throwable t2) {
|
||||||
|
logger.warn("Failed to close the datagram channel.", t2);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sessionPipeline = sessionChannel.pipeline();
|
||||||
|
}
|
||||||
|
|
||||||
|
// immediately trigger a read in the session pipeline
|
||||||
|
sessionPipeline.fireChannelRead(packet);
|
||||||
|
|
||||||
|
// will flush the pipeline if necessary
|
||||||
|
sessionPipeline.fireChannelReadComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ADDED to support closing a DatagramSessionChannel. Always called from the EventLoop
|
||||||
|
*/
|
||||||
|
public
|
||||||
|
void doCloseChannel(final DatagramSessionChannel datagramSessionChannel) {
|
||||||
|
InetSocketAddress remoteAddress = datagramSessionChannel.remoteAddress();
|
||||||
|
long channelId = getChannelId(remoteAddress);
|
||||||
|
|
||||||
|
datagramChannels.remove(channelId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public
|
||||||
|
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
final ChannelConfig config = ctx.channel()
|
||||||
|
.config();
|
||||||
|
if (config.isAutoRead()) {
|
||||||
|
// stop accept new connections for 1 second to allow the channel to recover
|
||||||
|
// See https://github.com/netty/netty/issues/1328
|
||||||
|
config.setAutoRead(false);
|
||||||
|
ctx.channel()
|
||||||
|
.eventLoop()
|
||||||
|
.schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
// still let the exceptionCaught event flow through the pipeline to give the user
|
||||||
|
// a chance to do something with it
|
||||||
|
ctx.fireExceptionCaught(cause);
|
||||||
|
}
|
||||||
|
}
|
@ -1,492 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2018 dorkbox, llc.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package io.netty.channel.socket.nio;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.net.SocketException;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.channels.SelectionKey;
|
|
||||||
import java.nio.channels.spi.SelectorProvider;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import dorkbox.network.pipeline.discovery.BroadcastServer;
|
|
||||||
import dorkbox.util.bytes.BigEndian.Long_;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.channel.*;
|
|
||||||
import io.netty.channel.nio.AbstractNioMessageChannel;
|
|
||||||
import io.netty.channel.socket.*;
|
|
||||||
import io.netty.util.collection.LongObjectHashMap;
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
|
||||||
import io.netty.util.internal.SocketUtils;
|
|
||||||
import io.netty.util.internal.StringUtil;
|
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An NIO datagram {@link Channel} that sends and receives an
|
|
||||||
* {@link AddressedEnvelope AddressedEnvelope<ByteBuf, SocketAddress>}.
|
|
||||||
*
|
|
||||||
* @see AddressedEnvelope
|
|
||||||
* @see DatagramPacket
|
|
||||||
*/
|
|
||||||
public final
|
|
||||||
class NioServerDatagramChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
|
|
||||||
|
|
||||||
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
|
|
||||||
private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
|
|
||||||
|
|
||||||
private static final String EXPECTED_TYPES = " (expected: " +
|
|
||||||
StringUtil.simpleClassName(DatagramPacket.class) + ", " +
|
|
||||||
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
|
|
||||||
StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
|
||||||
StringUtil.simpleClassName(SocketAddress.class) + ">, " +
|
|
||||||
StringUtil.simpleClassName(ByteBuf.class) + ')';
|
|
||||||
|
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerDatagramChannel.class);
|
|
||||||
|
|
||||||
|
|
||||||
private static
|
|
||||||
java.nio.channels.DatagramChannel newSocket(SelectorProvider provider) {
|
|
||||||
try {
|
|
||||||
/**
|
|
||||||
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
|
|
||||||
* {@link SelectorProvider#provider()} which is called by each DatagramSessionChannel.open() otherwise.
|
|
||||||
*
|
|
||||||
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
|
|
||||||
*/
|
|
||||||
return provider.openDatagramChannel();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ChannelException("Failed to open a socket.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static
|
|
||||||
java.nio.channels.DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) {
|
|
||||||
if (ipFamily == null) {
|
|
||||||
return newSocket(provider);
|
|
||||||
}
|
|
||||||
|
|
||||||
checkJavaVersion();
|
|
||||||
|
|
||||||
try {
|
|
||||||
return NioServerDatagramChannel7.newSocket(provider, ipFamily);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ChannelException("Failed to open a socket.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static
|
|
||||||
void checkJavaVersion() {
|
|
||||||
if (PlatformDependent.javaVersion() < 7) {
|
|
||||||
throw new UnsupportedOperationException("Only supported on java 7+.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer.
|
|
||||||
* (We check this because otherwise we need to make it a non-composite buffer.)
|
|
||||||
*/
|
|
||||||
private static
|
|
||||||
boolean isSingleDirectBuffer(ByteBuf buf) {
|
|
||||||
return buf.isDirect() && buf.nioBufferCount() == 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static
|
|
||||||
long getChannelId(final InetSocketAddress remoteAddress) {
|
|
||||||
int address = remoteAddress.getAddress()
|
|
||||||
.hashCode(); // we want it as an int
|
|
||||||
int port = remoteAddress.getPort(); // this is really just 2 bytes
|
|
||||||
|
|
||||||
byte[] combined = new byte[8];
|
|
||||||
combined[0] = (byte) ((port >>> 24) & 0xFF);
|
|
||||||
combined[1] = (byte) ((port >>> 16) & 0xFF);
|
|
||||||
combined[2] = (byte) ((port >>> 8) & 0xFF);
|
|
||||||
combined[3] = (byte) ((port) & 0xFF);
|
|
||||||
combined[4] = (byte) ((address >>> 24) & 0xFF);
|
|
||||||
combined[5] = (byte) ((address >>> 16) & 0xFF);
|
|
||||||
combined[6] = (byte) ((address >>> 8) & 0xFF);
|
|
||||||
combined[7] = (byte) ((address) & 0xFF);
|
|
||||||
|
|
||||||
return Long_.from(combined);
|
|
||||||
}
|
|
||||||
private final ServerSocketChannelConfig config;
|
|
||||||
|
|
||||||
|
|
||||||
// Does not need to be thread safe, because access only happens in the event loop
|
|
||||||
private final LongObjectHashMap<DatagramSessionChannel> datagramChannels = new LongObjectHashMap<DatagramSessionChannel>();
|
|
||||||
private BroadcastServer broadcastServer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}.
|
|
||||||
*/
|
|
||||||
public
|
|
||||||
NioServerDatagramChannel() {
|
|
||||||
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new instance using the given {@link SelectorProvider}
|
|
||||||
* which will use the Operation Systems default {@link InternetProtocolFamily}.
|
|
||||||
*/
|
|
||||||
public
|
|
||||||
NioServerDatagramChannel(SelectorProvider provider) {
|
|
||||||
this(newSocket(provider));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
|
|
||||||
* on the Operation Systems default which will be chosen.
|
|
||||||
*/
|
|
||||||
public
|
|
||||||
NioServerDatagramChannel(InternetProtocolFamily ipFamily) {
|
|
||||||
this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
|
|
||||||
* If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
|
|
||||||
* which will be chosen.
|
|
||||||
*/
|
|
||||||
public
|
|
||||||
NioServerDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
|
|
||||||
this(newSocket(provider, ipFamily));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new instance from the given {@link java.nio.channels.DatagramChannel}.
|
|
||||||
*/
|
|
||||||
public
|
|
||||||
NioServerDatagramChannel(java.nio.channels.DatagramChannel socket) {
|
|
||||||
super(null, socket, SelectionKey.OP_READ);
|
|
||||||
config = new NioServerDatagramChannelConfig(this, socket);
|
|
||||||
broadcastServer = new BroadcastServer();
|
|
||||||
}
|
|
||||||
|
|
||||||
void clearReadPending0() {
|
|
||||||
clearReadPending();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
boolean closeOnReadError(Throwable cause) {
|
|
||||||
// We do not want to close on SocketException when using DatagramSessionChannel as we usually can continue receiving.
|
|
||||||
// See https://github.com/netty/netty/issues/5893
|
|
||||||
if (cause instanceof SocketException) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return super.closeOnReadError(cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig config() {
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
boolean continueOnWriteError() {
|
|
||||||
// Continue on write error as a DatagramSessionChannel can write to multiple remote peers
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/2665
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
void doBind(SocketAddress localAddress) throws Exception {
|
|
||||||
doBind0(localAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
private
|
|
||||||
void doBind0(SocketAddress localAddress) throws Exception {
|
|
||||||
if (PlatformDependent.javaVersion() >= 7) {
|
|
||||||
SocketUtils.bind(javaChannel(), localAddress);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
javaChannel().socket()
|
|
||||||
.bind(localAddress);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Always called from the EventLoop
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
void doClose() throws Exception {
|
|
||||||
// have to close all of the fake DatagramChannels as well. Each one will remove itself from the channel map.
|
|
||||||
|
|
||||||
// We make a copy of this b/c of concurrent modification, in the event this is closed BEFORE the child-channels are closed
|
|
||||||
ArrayList<DatagramSessionChannel> channels = new ArrayList<DatagramSessionChannel>(datagramChannels.values());
|
|
||||||
for (DatagramSessionChannel datagramSessionChannel : channels) {
|
|
||||||
datagramSessionChannel.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
javaChannel().close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ADDED to support closing a DatagramSessionChannel. Always called from the EventLoop
|
|
||||||
*/
|
|
||||||
public
|
|
||||||
void doCloseChannel(final DatagramSessionChannel datagramSessionChannel) {
|
|
||||||
InetSocketAddress remoteAddress = datagramSessionChannel.remoteAddress();
|
|
||||||
long channelId = getChannelId(remoteAddress);
|
|
||||||
|
|
||||||
datagramChannels.remove(channelId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
|
|
||||||
// Unnecessary stuff
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
void doDisconnect() throws Exception {
|
|
||||||
// Unnecessary stuff
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
void doFinishConnect() throws Exception {
|
|
||||||
// Unnecessary stuff
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
int doReadMessages(List<Object> buf) throws Exception {
|
|
||||||
java.nio.channels.DatagramChannel ch = javaChannel();
|
|
||||||
ServerSocketChannelConfig config = config();
|
|
||||||
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
|
|
||||||
ChannelPipeline pipeline = pipeline();
|
|
||||||
|
|
||||||
ByteBuf data = allocHandle.allocate(config.getAllocator());
|
|
||||||
allocHandle.attemptedBytesRead(data.writableBytes());
|
|
||||||
boolean free = true;
|
|
||||||
try {
|
|
||||||
ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
|
|
||||||
int pos = nioData.position();
|
|
||||||
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData);
|
|
||||||
if (remoteAddress == null) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
allocHandle.lastBytesRead(nioData.position() - pos);
|
|
||||||
ByteBuf byteBuf = data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
|
|
||||||
// original behavior from NioDatagramChannel.
|
|
||||||
// buf.add(new DatagramPacket(byteBuf, localAddress(), remoteAddress));
|
|
||||||
// free = false;
|
|
||||||
// return 1;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// new behavior
|
|
||||||
|
|
||||||
// check to see if it's a broadcast packet or not
|
|
||||||
ByteBuf broadcast = broadcastServer.getBroadcastResponse(byteBuf, remoteAddress);
|
|
||||||
if (broadcast != null) {
|
|
||||||
// don't bother creating channels if this is a broadcast event. Just respond and be finished
|
|
||||||
doWriteBytes(broadcast, remoteAddress);
|
|
||||||
// no messages created (since we directly write to the channel).
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
long channelId = getChannelId(remoteAddress);
|
|
||||||
|
|
||||||
// create a new channel or reuse existing one
|
|
||||||
DatagramSessionChannel channel = datagramChannels.get(channelId);
|
|
||||||
if (channel == null) {
|
|
||||||
try {
|
|
||||||
channel = new DatagramSessionChannel(this, remoteAddress);
|
|
||||||
datagramChannels.put(channelId, channel);
|
|
||||||
|
|
||||||
// This channel is registered automatically AFTER this read method completes
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.warn("Failed to create a new datagram channel from a read operation.", t);
|
|
||||||
|
|
||||||
try {
|
|
||||||
channel.close();
|
|
||||||
} catch (Throwable t2) {
|
|
||||||
logger.warn("Failed to close the datagram channel.", t2);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the bytes of the datagram channel
|
|
||||||
channel.setBuffer(byteBuf);
|
|
||||||
|
|
||||||
pipeline.fireChannelRead(channel);
|
|
||||||
|
|
||||||
// immediately trigger a read
|
|
||||||
channel.read();
|
|
||||||
|
|
||||||
free = false;
|
|
||||||
// we manually fireChannelRead + read (caller class calls readComplete for us)
|
|
||||||
return 0;
|
|
||||||
} catch (Throwable cause) {
|
|
||||||
PlatformDependent.throwException(cause);
|
|
||||||
return -1; // -1 means to close this channel
|
|
||||||
} finally {
|
|
||||||
if (free) {
|
|
||||||
data.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
|
|
||||||
final SocketAddress remoteAddress;
|
|
||||||
final ByteBuf data;
|
|
||||||
if (msg instanceof AddressedEnvelope) {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
|
|
||||||
remoteAddress = envelope.recipient();
|
|
||||||
data = envelope.content();
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
data = (ByteBuf) msg;
|
|
||||||
remoteAddress = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return doWriteBytes(data, remoteAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
private
|
|
||||||
boolean doWriteBytes(final ByteBuf data, final SocketAddress remoteAddress) throws IOException {
|
|
||||||
final int dataLen = data.readableBytes();
|
|
||||||
if (dataLen == 0) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
|
|
||||||
final int writtenBytes;
|
|
||||||
if (remoteAddress != null) {
|
|
||||||
writtenBytes = javaChannel().send(nioData, remoteAddress);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
writtenBytes = javaChannel().write(nioData);
|
|
||||||
}
|
|
||||||
return writtenBytes > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
Object filterOutboundMessage(Object msg) {
|
|
||||||
if (msg instanceof DatagramPacket) {
|
|
||||||
DatagramPacket p = (DatagramPacket) msg;
|
|
||||||
ByteBuf content = p.content();
|
|
||||||
if (isSingleDirectBuffer(content)) {
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
return new DatagramPacket(newDirectBuffer(p, content), p.recipient());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg instanceof ByteBuf) {
|
|
||||||
ByteBuf buf = (ByteBuf) msg;
|
|
||||||
if (isSingleDirectBuffer(buf)) {
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
return newDirectBuffer(buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg instanceof AddressedEnvelope) {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
|
|
||||||
if (e.content() instanceof ByteBuf) {
|
|
||||||
ByteBuf content = (ByteBuf) e.content();
|
|
||||||
if (isSingleDirectBuffer(content)) {
|
|
||||||
return e;
|
|
||||||
}
|
|
||||||
return new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(newDirectBuffer(e, content), e.recipient());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public
|
|
||||||
boolean isActive() {
|
|
||||||
java.nio.channels.DatagramChannel ch = javaChannel();
|
|
||||||
// we do not support registration options
|
|
||||||
// return ch.isOpen() && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered() || ch.socket().isBound());
|
|
||||||
return ch.isOpen() || ch.socket()
|
|
||||||
.isBound();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
java.nio.channels.DatagramChannel javaChannel() {
|
|
||||||
return (java.nio.channels.DatagramChannel) super.javaChannel();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
InetSocketAddress localAddress() {
|
|
||||||
return (InetSocketAddress) super.localAddress();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
SocketAddress localAddress0() {
|
|
||||||
return javaChannel().socket()
|
|
||||||
.getLocalSocketAddress();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ChannelMetadata metadata() {
|
|
||||||
return METADATA;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
InetSocketAddress remoteAddress() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected
|
|
||||||
SocketAddress remoteAddress0() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
protected
|
|
||||||
void setReadPending(boolean readPending) {
|
|
||||||
super.setReadPending(readPending);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
String toString() {
|
|
||||||
return "NioServerDatagramChannel";
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,35 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2018 dorkbox, llc.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package io.netty.channel.socket.nio;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.channels.DatagramChannel;
|
|
||||||
import java.nio.channels.spi.SelectorProvider;
|
|
||||||
|
|
||||||
import io.netty.channel.socket.InternetProtocolFamily;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For Java7+ only!
|
|
||||||
*/
|
|
||||||
public
|
|
||||||
class NioServerDatagramChannel7 {
|
|
||||||
// NOTE: this is suppressed because we compile this for java7, and everything else for java6, and this is only called if we are java7+
|
|
||||||
@SuppressWarnings("Since15")
|
|
||||||
public static
|
|
||||||
DatagramChannel newSocket(final SelectorProvider provider, final InternetProtocolFamily ipFamily) throws IOException {
|
|
||||||
return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily));
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,171 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2018 dorkbox, llc.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package io.netty.channel.socket.nio;
|
|
||||||
|
|
||||||
import java.net.SocketException;
|
|
||||||
import java.nio.channels.DatagramChannel;
|
|
||||||
|
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
|
||||||
import io.netty.channel.*;
|
|
||||||
import io.netty.channel.socket.ServerSocketChannelConfig;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is a basic implementation of a ChannelConfig, with the exception that we take in a DatagramSessionChannel, and modify only the
|
|
||||||
* options of that channel that make sense
|
|
||||||
*/
|
|
||||||
public final
|
|
||||||
class NioServerDatagramChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig {
|
|
||||||
private final DatagramChannel datagramChannel;
|
|
||||||
|
|
||||||
public
|
|
||||||
NioServerDatagramChannelConfig(NioServerDatagramChannel channel, DatagramChannel datagramChannel) {
|
|
||||||
super(channel);
|
|
||||||
this.datagramChannel = datagramChannel;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
int getBacklog() {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setBacklog(final int backlog) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
boolean isReuseAddress() {
|
|
||||||
try {
|
|
||||||
return datagramChannel.socket()
|
|
||||||
.getReuseAddress();
|
|
||||||
} catch (SocketException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setReuseAddress(final boolean reuseAddress) {
|
|
||||||
try {
|
|
||||||
datagramChannel.socket()
|
|
||||||
.setReuseAddress(true);
|
|
||||||
} catch (SocketException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
int getReceiveBufferSize() {
|
|
||||||
try {
|
|
||||||
return datagramChannel.socket()
|
|
||||||
.getReceiveBufferSize();
|
|
||||||
} catch (SocketException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setReceiveBufferSize(final int receiveBufferSize) {
|
|
||||||
try {
|
|
||||||
datagramChannel.socket()
|
|
||||||
.setReceiveBufferSize(receiveBufferSize);
|
|
||||||
} catch (SocketException e) {
|
|
||||||
throw new ChannelException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setPerformancePreferences(final int connectionTime, final int latency, final int bandwidth) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setConnectTimeoutMillis(int timeout) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Deprecated
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setMaxMessagesPerRead(int n) {
|
|
||||||
super.setMaxMessagesPerRead(n);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setWriteSpinCount(int spincount) {
|
|
||||||
super.setWriteSpinCount(spincount);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setAllocator(ByteBufAllocator alloc) {
|
|
||||||
super.setAllocator(alloc);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator alloc) {
|
|
||||||
super.setRecvByteBufAllocator(alloc);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setAutoRead(boolean autoread) {
|
|
||||||
super.setAutoRead(autoread);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
|
||||||
return (ServerSocketChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
|
|
||||||
return (ServerSocketChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
|
|
||||||
return (ServerSocketChannelConfig) super.setWriteBufferWaterMark(writeBufferWaterMark);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public
|
|
||||||
ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator est) {
|
|
||||||
super.setMessageSizeEstimator(est);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user