diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java index 07bc5f0b..3b5cab43 100644 --- a/src/dorkbox/network/Server.java +++ b/src/dorkbox/network/Server.java @@ -29,13 +29,28 @@ import dorkbox.util.OS; import dorkbox.util.Property; import dorkbox.util.exceptions.SecurityException; import io.netty.bootstrap.ServerBootstrap; +import io.netty.bootstrap.SessionBootstrap; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.*; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.kqueue.KQueueDatagramChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerSocketChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalServerChannel; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerDatagramChannel; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.oio.OioDatagramChannel; +import io.netty.channel.socket.oio.OioServerSocketChannel; /** * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the @@ -64,7 +79,7 @@ class Server extends EndPointServer { private final ServerBootstrap localBootstrap; private final ServerBootstrap tcpBootstrap; - private final ServerBootstrap udpBootstrap; + private final SessionBootstrap udpBootstrap; private final int tcpPort; private final int udpPort; @@ -125,7 +140,7 @@ class Server extends EndPointServer { } if (udpPort > 0) { - udpBootstrap = new ServerBootstrap(); + udpBootstrap = new SessionBootstrap(); } else { udpBootstrap = null; @@ -138,25 +153,25 @@ class Server extends EndPointServer { final EventLoopGroup boss; final EventLoopGroup worker; - // if (OS.isAndroid()) { - // // android ONLY supports OIO (not NIO) - // boss = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup)); - // worker = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); - // } - // else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // // JNI network stack is MUCH faster (but only on linux) - // boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup)); - // worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); - // } - // else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // // KQueue network stack is MUCH faster (but only on macosx) - // boss = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup)); - // worker = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); - // } - // else { + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + boss = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup)); + worker = new OioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // JNI network stack is MUCH faster (but only on linux) + boss = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup)); + worker = new EpollEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + boss = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup)); + worker = new KQueueEventLoopGroup(EndPoint.DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); + } + else { boss = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName + "-boss", threadGroup)); worker = new NioEventLoopGroup(DEFAULT_THREAD_POOL_SIZE, new NamedThreadFactory(threadName, threadGroup)); - // } + } manageForShutdown(boss); manageForShutdown(worker); @@ -185,21 +200,21 @@ class Server extends EndPointServer { } if (tcpBootstrap != null) { - // if (OS.isAndroid()) { - // // android ONLY supports OIO (not NIO) - // tcpBootstrap.channel(OioServerSocketChannel.class); - // } - // else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // // JNI network stack is MUCH faster (but only on linux) - // tcpBootstrap.channel(EpollServerSocketChannel.class); - // } - // else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // // KQueue network stack is MUCH faster (but only on macosx) - // tcpBootstrap.channel(KQueueServerSocketChannel.class); - // } - // else { + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + tcpBootstrap.channel(OioServerSocketChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // JNI network stack is MUCH faster (but only on linux) + tcpBootstrap.channel(EpollServerSocketChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + tcpBootstrap.channel(KQueueServerSocketChannel.class); + } + else { tcpBootstrap.channel(NioServerSocketChannel.class); - // } + } // TODO: If we use netty for an HTTP server, // Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server. @@ -230,23 +245,22 @@ class Server extends EndPointServer { if (udpBootstrap != null) { - // if (OS.isAndroid()) { - // // android ONLY supports OIO (not NIO) - // udpBootstrap.channel(OioDatagramChannel.class); - // } - // else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // // JNI network stack is MUCH faster (but only on linux) - // udpBootstrap.channel(EpollDatagramChannel.class); - // } - // else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // // KQueue network stack is MUCH faster (but only on macosx) - // udpBootstrap.channel(KQueueDatagramChannel.class); - // } - // else { + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + udpBootstrap.channel(OioDatagramChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // JNI network stack is MUCH faster (but only on linux) + udpBootstrap.channel(EpollDatagramChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + udpBootstrap.channel(KQueueDatagramChannel.class); + } + else { // windows and linux/mac that are incompatible with the native implementations - // udpBootstrap.channel(NioDatagramChannel.class); - // } - udpBootstrap.channel(NioServerDatagramChannel.class); + udpBootstrap.channel(NioDatagramChannel.class); + } // Netty4 has a default of 2048 bytes as upper limit for datagram packets, we want this to be whatever we specify @@ -260,11 +274,7 @@ class Server extends EndPointServer { // TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP // OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets) .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722 - - .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, - registrationWrapper)); - - + .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, registrationWrapper)); // // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address! // if (hostName.equals("0.0.0.0")) { diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java index f65be01c..06a15c92 100644 --- a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java @@ -44,7 +44,7 @@ class KryoDecoderUdp extends MessageToMessageDecoder { @Override public boolean acceptInboundMessage(final Object msg) throws Exception { - return msg instanceof ByteBuf || msg instanceof AddressedEnvelope; + return msg instanceof AddressedEnvelope; } /** @@ -74,15 +74,7 @@ class KryoDecoderUdp extends MessageToMessageDecoder { @Override protected void decode(ChannelHandlerContext context, Object message, List out) throws Exception { - ByteBuf data; - if (message instanceof AddressedEnvelope) { - // this is on the client - data = (ByteBuf) ((AddressedEnvelope) message).content(); - } else { - // this is on the server - data = (ByteBuf) message; - } - + ByteBuf data = (ByteBuf) ((AddressedEnvelope) message).content(); if (data != null) { try { diff --git a/src/io/netty/channel/socket/nio/DatagramSessionChannel.java b/src/io/netty/bootstrap/DatagramSessionChannel.java similarity index 75% rename from src/io/netty/channel/socket/nio/DatagramSessionChannel.java rename to src/io/netty/bootstrap/DatagramSessionChannel.java index 66315ec5..72f03d45 100644 --- a/src/io/netty/channel/socket/nio/DatagramSessionChannel.java +++ b/src/io/netty/bootstrap/DatagramSessionChannel.java @@ -13,15 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.netty.channel.socket.nio; +package io.netty.bootstrap; import java.net.InetSocketAddress; import java.net.SocketAddress; -import io.netty.buffer.ByteBuf; -import io.netty.channel.*; -import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; -import io.netty.channel.nio.NioEventLoop; +import io.netty.channel.AbstractChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; import io.netty.channel.socket.DatagramPacket; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.RecyclableArrayList; @@ -41,22 +44,27 @@ class DatagramSessionChannel extends AbstractChannel implements Channel { private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); - protected final DatagramSessionChannelConfig config; + private final DatagramSessionChannelConfig config; - protected final NioServerDatagramChannel serverChannel; - protected final InetSocketAddress remote; + private SessionManager sessionManager; + private InetSocketAddress localAddress; + private InetSocketAddress remoteAddress; private volatile boolean isOpen = true; - private ByteBuf buffer; - protected - DatagramSessionChannel(NioServerDatagramChannel serverChannel, InetSocketAddress remote) { - super(serverChannel); - this.serverChannel = serverChannel; - this.remote = remote; + DatagramSessionChannel(final Channel parentChannel, + final SessionManager sessionManager, + final DatagramSessionChannelConfig sessionConfig, + final InetSocketAddress localAddress, + final InetSocketAddress remoteAddress) { + super(parentChannel); - config = new DatagramSessionChannelConfig(this, serverChannel); + this.sessionManager = sessionManager; + this.config = sessionConfig; + + this.localAddress = localAddress; + this.remoteAddress = remoteAddress; } @Override @@ -68,12 +76,6 @@ class DatagramSessionChannel extends AbstractChannel implements Channel { @Override protected void doBeginRead() throws Exception { - // a single packet is 100% of our data, so we cannot have multiple reads (there is no "session" for UDP) - ChannelPipeline pipeline = pipeline(); - - pipeline.fireChannelRead(buffer); - pipeline.fireChannelReadComplete(); - buffer = null; } @Override @@ -86,7 +88,7 @@ class DatagramSessionChannel extends AbstractChannel implements Channel { protected void doClose() throws Exception { isOpen = false; - serverChannel.doCloseChannel(this); + sessionManager.doCloseChannel(this); } @Override @@ -119,7 +121,7 @@ class DatagramSessionChannel extends AbstractChannel implements Channel { } //schedule a task that will write those entries - NioEventLoop eventLoop = serverChannel.eventLoop(); + EventLoop eventLoop = parent().eventLoop(); if (eventLoop.inEventLoop()) { write0(list); } @@ -143,7 +145,8 @@ class DatagramSessionChannel extends AbstractChannel implements Channel { @Override protected boolean isCompatible(EventLoop eventloop) { - return eventloop instanceof NioEventLoop; + // compatible with all Datagram event loops where we are explicitly used + return true; } @Override @@ -161,7 +164,7 @@ class DatagramSessionChannel extends AbstractChannel implements Channel { @Override protected SocketAddress localAddress0() { - return serverChannel.localAddress0(); + return localAddress; } @Override @@ -180,24 +183,19 @@ class DatagramSessionChannel extends AbstractChannel implements Channel { @Override public InetSocketAddress remoteAddress() { - return remote; + return remoteAddress; } @Override protected InetSocketAddress remoteAddress0() { - return remote; - } - - public - void setBuffer(final ByteBuf buffer) { - this.buffer = buffer; + return remoteAddress; } private void write0(final RecyclableArrayList list) { try { - NioUnsafe unsafe = serverChannel.unsafe(); + Unsafe unsafe = super.parent().unsafe(); for (Object buf : list) { unsafe.write(buf, voidPromise()); diff --git a/src/io/netty/channel/socket/nio/DatagramSessionChannelConfig.java b/src/io/netty/bootstrap/DatagramSessionChannelConfig.java similarity index 86% rename from src/io/netty/channel/socket/nio/DatagramSessionChannelConfig.java rename to src/io/netty/bootstrap/DatagramSessionChannelConfig.java index af5c88ae..f26ac860 100644 --- a/src/io/netty/channel/socket/nio/DatagramSessionChannelConfig.java +++ b/src/io/netty/bootstrap/DatagramSessionChannelConfig.java @@ -13,13 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.netty.channel.socket.nio; +package io.netty.bootstrap; import java.util.Map; import dorkbox.network.connection.EndPoint; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultMessageSizeEstimator; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.DatagramChannelConfig; /** @@ -31,14 +37,14 @@ public class DatagramSessionChannelConfig implements ChannelConfig { private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR; - private final NioServerDatagramChannel serverDatagramSessionChannel; + private final Channel channel; /** * Creates a new instance. */ public - DatagramSessionChannelConfig(DatagramSessionChannel channel, final NioServerDatagramChannel serverDatagramSessionChannel) { - this.serverDatagramSessionChannel = serverDatagramSessionChannel; + DatagramSessionChannelConfig(Channel channel) { + this.channel = channel; } @Override @@ -56,7 +62,7 @@ public class DatagramSessionChannelConfig implements ChannelConfig { @Override public T getOption(final ChannelOption option) { - return serverDatagramSessionChannel.config().getOption(option); + return channel.config().getOption(option); } @Override @@ -104,8 +110,7 @@ public class DatagramSessionChannelConfig implements ChannelConfig { @Override public ByteBufAllocator getAllocator() { - return serverDatagramSessionChannel.config() - .getAllocator(); + return channel.config().getAllocator(); } @Override @@ -117,8 +122,7 @@ public class DatagramSessionChannelConfig implements ChannelConfig { @Override public T getRecvByteBufAllocator() { - return serverDatagramSessionChannel.config() - .getRecvByteBufAllocator(); + return channel.config().getRecvByteBufAllocator(); } @Override diff --git a/src/io/netty/bootstrap/SessionBootstrap.java b/src/io/netty/bootstrap/SessionBootstrap.java new file mode 100644 index 00000000..f7ffddc9 --- /dev/null +++ b/src/io/netty/bootstrap/SessionBootstrap.java @@ -0,0 +1,289 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.bootstrap; + +import java.net.SocketAddress; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolverGroup; +import io.netty.resolver.DefaultAddressResolverGroup; +import io.netty.util.AttributeKey; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * VERY similar to the ServerBootstrap class, with the change to having a "SessionManager" added to the pipeline on init (instead of + * the ServerBootstrapAcceptor getting added) + */ +public +class SessionBootstrap extends AbstractBootstrap { + private static final AddressResolverGroup DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE; + private static final InternalLogger logger = InternalLoggerFactory.getInstance(SessionBootstrap.class); + + @SuppressWarnings("unchecked") + private static + Entry, Object>[] newAttrArray(int size) { + return new Entry[size]; + } + + @SuppressWarnings("unchecked") + private static + Map.Entry, Object>[] newOptionArray(int size) { + return new Map.Entry[size]; + } + + + private final Map, Object> childOptions = new LinkedHashMap, Object>(); + private final Map, Object> childAttrs = new LinkedHashMap, Object>(); + private final SessionBootstrapConfig config = new SessionBootstrapConfig(this); + private volatile EventLoopGroup childGroup; + private volatile ChannelHandler childHandler; + @SuppressWarnings("unchecked") + private volatile AddressResolverGroup resolver = (AddressResolverGroup) DEFAULT_RESOLVER; + + public + SessionBootstrap() { } + + private + SessionBootstrap(SessionBootstrap bootstrap) { + super(bootstrap); + + resolver = bootstrap.resolver; + + childGroup = bootstrap.childGroup; + childHandler = bootstrap.childHandler; + + synchronized (bootstrap.childOptions) { + childOptions.putAll(bootstrap.childOptions); + } + synchronized (bootstrap.childAttrs) { + childAttrs.putAll(bootstrap.childAttrs); + } + } + + /** + * Set the specific {@link AttributeKey} with the given value on every child {@link Channel}. If the value is + * {@code null} the {@link AttributeKey} is removed + */ + public + SessionBootstrap childAttr(AttributeKey childKey, T value) { + if (childKey == null) { + throw new NullPointerException("childKey"); + } + if (value == null) { + childAttrs.remove(childKey); + } + else { + childAttrs.put(childKey, value); + } + return this; + } + + final + Map, Object> childAttrs() { + return copiedMap(childAttrs); + } + + /** + * Return the configured {@link EventLoopGroup} which will be used for the child channels or {@code null} + * if non is configured yet. + * + * @deprecated Use {@link #config()} instead. + */ + @Deprecated + public + EventLoopGroup childGroup() { + return childGroup; + } + + /** + * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s. + */ + public + SessionBootstrap childHandler(ChannelHandler childHandler) { + if (childHandler == null) { + throw new NullPointerException("childHandler"); + } + this.childHandler = childHandler; + return this; + } + + final + ChannelHandler childHandler() { + return childHandler; + } + + /** + * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created + * (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set + * {@link ChannelOption}. + */ + public + SessionBootstrap childOption(ChannelOption childOption, T value) { + if (childOption == null) { + throw new NullPointerException("childOption"); + } + if (value == null) { + synchronized (childOptions) { + childOptions.remove(childOption); + } + } + else { + synchronized (childOptions) { + childOptions.put(childOption, value); + } + } + return this; + } + + final + Map, Object> childOptions() { + return copiedMap(childOptions); + } + + @Override + @SuppressWarnings("CloneDoesntCallSuperClone") + public + SessionBootstrap clone() { + return new SessionBootstrap(this); + } + + @Override + public final + SessionBootstrapConfig config() { + return config; + } + + /** + * Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client). + */ + @Override + public + SessionBootstrap group(EventLoopGroup group) { + return group(group, group); + } + + /** + * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These + * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link DatagramSessionChannel} and + * {@link Channel}'s. + */ + public + SessionBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { + super.group(parentGroup); + if (childGroup == null) { + throw new NullPointerException("childGroup"); + } + if (this.childGroup != null) { + throw new IllegalStateException("childGroup set already"); + } + this.childGroup = childGroup; + return this; + } + + @Override + void init(Channel channel) throws Exception { + final Map, Object> options = options0(); + synchronized (options) { + setChannelOptions(channel, options, logger); + } + + final Map, Object> attrs = attrs0(); + synchronized (attrs) { + for (Entry, Object> e : attrs.entrySet()) { + @SuppressWarnings("unchecked") + AttributeKey key = (AttributeKey) e.getKey(); + channel.attr(key) + .set(e.getValue()); + } + } + + ChannelPipeline p = channel.pipeline(); + + final EventLoopGroup currentChildGroup = childGroup; + final ChannelHandler currentChildHandler = childHandler; + final Entry, Object>[] currentChildOptions; + final Entry, Object>[] currentChildAttrs; + + + synchronized (childOptions) { + currentChildOptions = childOptions.entrySet() + .toArray(newOptionArray(childOptions.size())); + } + synchronized (childAttrs) { + currentChildAttrs = childAttrs.entrySet() + .toArray(newAttrArray(childAttrs.size())); + } + + p.addLast(new ChannelInitializer() { + @Override + public + void initChannel(final Channel ch) throws Exception { + final ChannelPipeline pipeline = ch.pipeline(); + ChannelHandler handler = config.handler(); + if (handler != null) { + pipeline.addLast(handler); + } + + ch.eventLoop() + .execute(new Runnable() { + @Override + public + void run() { + pipeline.addLast(new SessionManager(ch, + currentChildGroup, + currentChildHandler, + currentChildOptions, + currentChildAttrs)); + } + }); + } + }); + } + + public + SocketAddress remoteAddress() { + // the nature of this is that WE do not have a remote address, but our child channels DO have a remote address + return null; + } + + public + AddressResolverGroup resolver() { + return resolver; + } + + @Override + public + SessionBootstrap validate() { + super.validate(); + if (childHandler == null) { + throw new IllegalStateException("childHandler not set"); + } + if (childGroup == null) { + logger.warn("childGroup is not set. Using parentGroup instead."); + childGroup = config.group(); + } + return this; + } +} diff --git a/src/io/netty/bootstrap/SessionBootstrapConfig.java b/src/io/netty/bootstrap/SessionBootstrapConfig.java new file mode 100644 index 00000000..e1e8eb94 --- /dev/null +++ b/src/io/netty/bootstrap/SessionBootstrapConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.bootstrap; + +import java.net.SocketAddress; + +import io.netty.channel.Channel; +import io.netty.resolver.AddressResolverGroup; + +/** + * Exposes the configuration of a {@link Bootstrap}. + */ +public final +class SessionBootstrapConfig extends AbstractBootstrapConfig { + + SessionBootstrapConfig(SessionBootstrap bootstrap) { + super(bootstrap); + } + + /** + * Returns the configured remote address or {@code null} if non is configured yet. + */ + public + SocketAddress remoteAddress() { + return bootstrap.remoteAddress(); + } + + /** + * Returns the configured {@link AddressResolverGroup} or the default if non is configured yet. + */ + public + AddressResolverGroup resolver() { + return bootstrap.resolver(); + } + + @Override + public + String toString() { + StringBuilder buf = new StringBuilder(super.toString()); + buf.setLength(buf.length() - 1); + buf.append(", resolver: ") + .append(resolver()); + SocketAddress remoteAddress = remoteAddress(); + if (remoteAddress != null) { + buf.append(", remoteAddress: ") + .append(remoteAddress); + } + return buf.append(')') + .toString(); + } +} diff --git a/src/io/netty/bootstrap/SessionManager.java b/src/io/netty/bootstrap/SessionManager.java new file mode 100644 index 00000000..2d8249dd --- /dev/null +++ b/src/io/netty/bootstrap/SessionManager.java @@ -0,0 +1,245 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.bootstrap; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import dorkbox.network.pipeline.discovery.BroadcastServer; +import dorkbox.util.bytes.BigEndian.Long_; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.AttributeKey; +import io.netty.util.collection.LongObjectHashMap; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * + */ +public +class SessionManager extends ChannelInboundHandlerAdapter { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(SessionBootstrap.class); + + private static + void forceClose(DatagramSessionChannel child, Throwable t) throws Exception { + child.unsafe() + .closeForcibly(); + + child.doClose(); + + logger.warn("Failed to register an accepted channel: {}", child, t); + } + + private static + long getChannelId(final InetSocketAddress remoteAddress) { + int address = remoteAddress.getAddress() + .hashCode(); // we want it as an int + + int port = remoteAddress.getPort(); // this is really just 2 bytes + + byte[] combined = new byte[8]; + combined[0] = (byte) ((port >>> 24) & 0xFF); + combined[1] = (byte) ((port >>> 16) & 0xFF); + combined[2] = (byte) ((port >>> 8) & 0xFF); + combined[3] = (byte) ((port) & 0xFF); + combined[4] = (byte) ((address >>> 24) & 0xFF); + combined[5] = (byte) ((address >>> 16) & 0xFF); + combined[6] = (byte) ((address >>> 8) & 0xFF); + combined[7] = (byte) ((address) & 0xFF); + + return Long_.from(combined); + } + + private final BroadcastServer broadcastServer = new BroadcastServer(); + // Does not need to be thread safe, because access only happens in the event loop + private final LongObjectHashMap datagramChannels = new LongObjectHashMap(); + + + private final EventLoopGroup childGroup; + private final ChannelHandler childHandler; + + private final Entry, Object>[] childOptions; + private final Entry, Object>[] childAttrs; + + private final Runnable enableAutoReadTask; + private final DatagramSessionChannelConfig sessionConfig; + + + SessionManager(final Channel channel, + EventLoopGroup childGroup, + ChannelHandler childHandler, + Entry, Object>[] childOptions, + Entry, Object>[] childAttrs) { + + this.sessionConfig = new DatagramSessionChannelConfig(channel); + + this.childGroup = childGroup; + this.childHandler = childHandler; + this.childOptions = childOptions; + this.childAttrs = childAttrs; + + // Task which is scheduled to re-enable auto-read. + // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may + // not be able to load the class because of the file limit it already reached. + // + // See https://github.com/netty/netty/issues/1328 + enableAutoReadTask = new Runnable() { + @Override + public + void run() { + channel.config() + .setAutoRead(true); + } + }; + } + + @Override + public + void channelInactive(final ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + + // have to close all of the "fake" DatagramChannels as well. Each one will remove itself from the channel map. + + // We make a copy of this b/c of concurrent modification, in the event this is closed BEFORE the child-channels are closed + ArrayList channels = new ArrayList(datagramChannels.values()); + for (DatagramSessionChannel datagramSessionChannel : channels) { + datagramSessionChannel.close(); + } + } + + @Override + @SuppressWarnings("unchecked") + public + void channelRead(ChannelHandlerContext context, Object msg) { + Channel channel = context.channel(); + + DatagramPacket packet = ((DatagramPacket) msg); + + ByteBuf content = packet.content(); + InetSocketAddress localAddress = packet.recipient(); + InetSocketAddress remoteAddress = packet.sender(); + + // check to see if it's a broadcast packet or not + if (broadcastServer.isBroadcast(channel, content, localAddress, remoteAddress)) { + // don't bother creating channels if this is a broadcast event. Just respond and be finished + return; + } + + + long channelId = getChannelId(remoteAddress); + + // create a new channel or reuse existing one + DatagramSessionChannel sessionChannel = datagramChannels.get(channelId); + ChannelPipeline sessionPipeline; + if (sessionChannel == null) { + try { + sessionChannel = new DatagramSessionChannel(context.channel(), this, sessionConfig, localAddress, remoteAddress); + datagramChannels.put(channelId, sessionChannel); + + sessionPipeline = sessionChannel.pipeline(); + + // add the child handler to the fake channel + sessionPipeline.addLast(childHandler); + + // setup the channel options + AbstractBootstrap.setChannelOptions(sessionChannel, childOptions, logger); + + for (Entry, Object> e : childAttrs) { + sessionChannel.attr((AttributeKey) e.getKey()) + .set(e.getValue()); + } + + try { + final DatagramSessionChannel finalSessionChannel = sessionChannel; + childGroup.register(sessionChannel) + .addListener(new ChannelFutureListener() { + @Override + public + void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + forceClose(finalSessionChannel, future.cause()); + } + } + }); + } catch (Throwable t) { + forceClose(sessionChannel, t); + } + } catch (Throwable t) { + logger.warn("Failed to create a new datagram channel from a read operation.", t); + + try { + if (sessionChannel != null) { + sessionChannel.close(); + } + } catch (Throwable t2) { + logger.warn("Failed to close the datagram channel.", t2); + } + + return; + } + } else { + sessionPipeline = sessionChannel.pipeline(); + } + + // immediately trigger a read in the session pipeline + sessionPipeline.fireChannelRead(packet); + + // will flush the pipeline if necessary + sessionPipeline.fireChannelReadComplete(); + } + + /** + * ADDED to support closing a DatagramSessionChannel. Always called from the EventLoop + */ + public + void doCloseChannel(final DatagramSessionChannel datagramSessionChannel) { + InetSocketAddress remoteAddress = datagramSessionChannel.remoteAddress(); + long channelId = getChannelId(remoteAddress); + + datagramChannels.remove(channelId); + } + + @Override + public + void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + final ChannelConfig config = ctx.channel() + .config(); + if (config.isAutoRead()) { + // stop accept new connections for 1 second to allow the channel to recover + // See https://github.com/netty/netty/issues/1328 + config.setAutoRead(false); + ctx.channel() + .eventLoop() + .schedule(enableAutoReadTask, 1, TimeUnit.SECONDS); + } + // still let the exceptionCaught event flow through the pipeline to give the user + // a chance to do something with it + ctx.fireExceptionCaught(cause); + } +} diff --git a/src/io/netty/channel/socket/nio/NioServerDatagramChannel.java b/src/io/netty/channel/socket/nio/NioServerDatagramChannel.java deleted file mode 100644 index 6e1e2760..00000000 --- a/src/io/netty/channel/socket/nio/NioServerDatagramChannel.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Copyright 2018 dorkbox, llc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.spi.SelectorProvider; -import java.util.ArrayList; -import java.util.List; - -import dorkbox.network.pipeline.discovery.BroadcastServer; -import dorkbox.util.bytes.BigEndian.Long_; -import io.netty.buffer.ByteBuf; -import io.netty.channel.*; -import io.netty.channel.nio.AbstractNioMessageChannel; -import io.netty.channel.socket.*; -import io.netty.util.collection.LongObjectHashMap; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.SocketUtils; -import io.netty.util.internal.StringUtil; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -/** - * An NIO datagram {@link Channel} that sends and receives an - * {@link AddressedEnvelope AddressedEnvelope}. - * - * @see AddressedEnvelope - * @see DatagramPacket - */ -public final -class NioServerDatagramChannel extends AbstractNioMessageChannel implements ServerSocketChannel { - - private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); - private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16); - - private static final String EXPECTED_TYPES = " (expected: " + - StringUtil.simpleClassName(DatagramPacket.class) + ", " + - StringUtil.simpleClassName(AddressedEnvelope.class) + '<' + - StringUtil.simpleClassName(ByteBuf.class) + ", " + - StringUtil.simpleClassName(SocketAddress.class) + ">, " + - StringUtil.simpleClassName(ByteBuf.class) + ')'; - - private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerDatagramChannel.class); - - - private static - java.nio.channels.DatagramChannel newSocket(SelectorProvider provider) { - try { - /** - * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in - * {@link SelectorProvider#provider()} which is called by each DatagramSessionChannel.open() otherwise. - * - * See #2308. - */ - return provider.openDatagramChannel(); - } catch (IOException e) { - throw new ChannelException("Failed to open a socket.", e); - } - } - - private static - java.nio.channels.DatagramChannel newSocket(SelectorProvider provider, InternetProtocolFamily ipFamily) { - if (ipFamily == null) { - return newSocket(provider); - } - - checkJavaVersion(); - - try { - return NioServerDatagramChannel7.newSocket(provider, ipFamily); - } catch (IOException e) { - throw new ChannelException("Failed to open a socket.", e); - } - } - - private static - void checkJavaVersion() { - if (PlatformDependent.javaVersion() < 7) { - throw new UnsupportedOperationException("Only supported on java 7+."); - } - } - - /** - * Checks if the specified buffer is a direct buffer and is composed of a single NIO buffer. - * (We check this because otherwise we need to make it a non-composite buffer.) - */ - private static - boolean isSingleDirectBuffer(ByteBuf buf) { - return buf.isDirect() && buf.nioBufferCount() == 1; - } - - private static - long getChannelId(final InetSocketAddress remoteAddress) { - int address = remoteAddress.getAddress() - .hashCode(); // we want it as an int - int port = remoteAddress.getPort(); // this is really just 2 bytes - - byte[] combined = new byte[8]; - combined[0] = (byte) ((port >>> 24) & 0xFF); - combined[1] = (byte) ((port >>> 16) & 0xFF); - combined[2] = (byte) ((port >>> 8) & 0xFF); - combined[3] = (byte) ((port) & 0xFF); - combined[4] = (byte) ((address >>> 24) & 0xFF); - combined[5] = (byte) ((address >>> 16) & 0xFF); - combined[6] = (byte) ((address >>> 8) & 0xFF); - combined[7] = (byte) ((address) & 0xFF); - - return Long_.from(combined); - } - private final ServerSocketChannelConfig config; - - - // Does not need to be thread safe, because access only happens in the event loop - private final LongObjectHashMap datagramChannels = new LongObjectHashMap(); - private BroadcastServer broadcastServer; - - /** - * Create a new instance which will use the Operation Systems default {@link InternetProtocolFamily}. - */ - public - NioServerDatagramChannel() { - this(newSocket(DEFAULT_SELECTOR_PROVIDER)); - } - - /** - * Create a new instance using the given {@link SelectorProvider} - * which will use the Operation Systems default {@link InternetProtocolFamily}. - */ - public - NioServerDatagramChannel(SelectorProvider provider) { - this(newSocket(provider)); - } - - /** - * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend - * on the Operation Systems default which will be chosen. - */ - public - NioServerDatagramChannel(InternetProtocolFamily ipFamily) { - this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily)); - } - - /** - * Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}. - * If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default - * which will be chosen. - */ - public - NioServerDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) { - this(newSocket(provider, ipFamily)); - } - - /** - * Create a new instance from the given {@link java.nio.channels.DatagramChannel}. - */ - public - NioServerDatagramChannel(java.nio.channels.DatagramChannel socket) { - super(null, socket, SelectionKey.OP_READ); - config = new NioServerDatagramChannelConfig(this, socket); - broadcastServer = new BroadcastServer(); - } - - void clearReadPending0() { - clearReadPending(); - } - - @Override - protected - boolean closeOnReadError(Throwable cause) { - // We do not want to close on SocketException when using DatagramSessionChannel as we usually can continue receiving. - // See https://github.com/netty/netty/issues/5893 - if (cause instanceof SocketException) { - return false; - } - return super.closeOnReadError(cause); - } - - @Override - public - ServerSocketChannelConfig config() { - return config; - } - - @Override - protected - boolean continueOnWriteError() { - // Continue on write error as a DatagramSessionChannel can write to multiple remote peers - // - // See https://github.com/netty/netty/issues/2665 - return true; - } - - @Override - protected - void doBind(SocketAddress localAddress) throws Exception { - doBind0(localAddress); - } - - private - void doBind0(SocketAddress localAddress) throws Exception { - if (PlatformDependent.javaVersion() >= 7) { - SocketUtils.bind(javaChannel(), localAddress); - } - else { - javaChannel().socket() - .bind(localAddress); - } - } - - // Always called from the EventLoop - @Override - protected - void doClose() throws Exception { - // have to close all of the fake DatagramChannels as well. Each one will remove itself from the channel map. - - // We make a copy of this b/c of concurrent modification, in the event this is closed BEFORE the child-channels are closed - ArrayList channels = new ArrayList(datagramChannels.values()); - for (DatagramSessionChannel datagramSessionChannel : channels) { - datagramSessionChannel.close(); - } - - javaChannel().close(); - } - - /** - * ADDED to support closing a DatagramSessionChannel. Always called from the EventLoop - */ - public - void doCloseChannel(final DatagramSessionChannel datagramSessionChannel) { - InetSocketAddress remoteAddress = datagramSessionChannel.remoteAddress(); - long channelId = getChannelId(remoteAddress); - - datagramChannels.remove(channelId); - } - - @Override - protected - boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { - // Unnecessary stuff - throw new UnsupportedOperationException(); - } - - @Override - protected - void doDisconnect() throws Exception { - // Unnecessary stuff - throw new UnsupportedOperationException(); - } - - @Override - protected - void doFinishConnect() throws Exception { - // Unnecessary stuff - throw new UnsupportedOperationException(); - } - - - @Override - protected - int doReadMessages(List buf) throws Exception { - java.nio.channels.DatagramChannel ch = javaChannel(); - ServerSocketChannelConfig config = config(); - RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); - ChannelPipeline pipeline = pipeline(); - - ByteBuf data = allocHandle.allocate(config.getAllocator()); - allocHandle.attemptedBytesRead(data.writableBytes()); - boolean free = true; - try { - ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); - int pos = nioData.position(); - InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData); - if (remoteAddress == null) { - return 0; - } - - allocHandle.lastBytesRead(nioData.position() - pos); - ByteBuf byteBuf = data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()); - // original behavior from NioDatagramChannel. - // buf.add(new DatagramPacket(byteBuf, localAddress(), remoteAddress)); - // free = false; - // return 1; - - - - // new behavior - - // check to see if it's a broadcast packet or not - ByteBuf broadcast = broadcastServer.getBroadcastResponse(byteBuf, remoteAddress); - if (broadcast != null) { - // don't bother creating channels if this is a broadcast event. Just respond and be finished - doWriteBytes(broadcast, remoteAddress); - // no messages created (since we directly write to the channel). - return 0; - } - - - long channelId = getChannelId(remoteAddress); - - // create a new channel or reuse existing one - DatagramSessionChannel channel = datagramChannels.get(channelId); - if (channel == null) { - try { - channel = new DatagramSessionChannel(this, remoteAddress); - datagramChannels.put(channelId, channel); - - // This channel is registered automatically AFTER this read method completes - } catch (Throwable t) { - logger.warn("Failed to create a new datagram channel from a read operation.", t); - - try { - channel.close(); - } catch (Throwable t2) { - logger.warn("Failed to close the datagram channel.", t2); - } - - return 0; - } - } - - // set the bytes of the datagram channel - channel.setBuffer(byteBuf); - - pipeline.fireChannelRead(channel); - - // immediately trigger a read - channel.read(); - - free = false; - // we manually fireChannelRead + read (caller class calls readComplete for us) - return 0; - } catch (Throwable cause) { - PlatformDependent.throwException(cause); - return -1; // -1 means to close this channel - } finally { - if (free) { - data.release(); - } - } - } - - @Override - protected - boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { - final SocketAddress remoteAddress; - final ByteBuf data; - if (msg instanceof AddressedEnvelope) { - @SuppressWarnings("unchecked") - AddressedEnvelope envelope = (AddressedEnvelope) msg; - remoteAddress = envelope.recipient(); - data = envelope.content(); - } - else { - data = (ByteBuf) msg; - remoteAddress = null; - } - - return doWriteBytes(data, remoteAddress); - } - - private - boolean doWriteBytes(final ByteBuf data, final SocketAddress remoteAddress) throws IOException { - final int dataLen = data.readableBytes(); - if (dataLen == 0) { - return true; - } - - final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen); - final int writtenBytes; - if (remoteAddress != null) { - writtenBytes = javaChannel().send(nioData, remoteAddress); - } - else { - writtenBytes = javaChannel().write(nioData); - } - return writtenBytes > 0; - } - - @Override - protected - Object filterOutboundMessage(Object msg) { - if (msg instanceof DatagramPacket) { - DatagramPacket p = (DatagramPacket) msg; - ByteBuf content = p.content(); - if (isSingleDirectBuffer(content)) { - return p; - } - return new DatagramPacket(newDirectBuffer(p, content), p.recipient()); - } - - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (isSingleDirectBuffer(buf)) { - return buf; - } - return newDirectBuffer(buf); - } - - if (msg instanceof AddressedEnvelope) { - @SuppressWarnings("unchecked") - AddressedEnvelope e = (AddressedEnvelope) msg; - if (e.content() instanceof ByteBuf) { - ByteBuf content = (ByteBuf) e.content(); - if (isSingleDirectBuffer(content)) { - return e; - } - return new DefaultAddressedEnvelope(newDirectBuffer(e, content), e.recipient()); - } - } - - throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); - } - - @Override - @SuppressWarnings("deprecation") - public - boolean isActive() { - java.nio.channels.DatagramChannel ch = javaChannel(); - // we do not support registration options - // return ch.isOpen() && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered() || ch.socket().isBound()); - return ch.isOpen() || ch.socket() - .isBound(); - } - - @Override - protected - java.nio.channels.DatagramChannel javaChannel() { - return (java.nio.channels.DatagramChannel) super.javaChannel(); - } - - @Override - public - InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - - @Override - protected - SocketAddress localAddress0() { - return javaChannel().socket() - .getLocalSocketAddress(); - } - - @Override - public - ChannelMetadata metadata() { - return METADATA; - } - - @Override - public - InetSocketAddress remoteAddress() { - return null; - } - - @Override - protected - SocketAddress remoteAddress0() { - return null; - } - - @Override - @Deprecated - protected - void setReadPending(boolean readPending) { - super.setReadPending(readPending); - } - - @Override - public - String toString() { - return "NioServerDatagramChannel"; - } -} diff --git a/src/io/netty/channel/socket/nio/NioServerDatagramChannel7.java b/src/io/netty/channel/socket/nio/NioServerDatagramChannel7.java deleted file mode 100644 index 87a16110..00000000 --- a/src/io/netty/channel/socket/nio/NioServerDatagramChannel7.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2018 dorkbox, llc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.channel.socket.nio; - -import java.io.IOException; -import java.nio.channels.DatagramChannel; -import java.nio.channels.spi.SelectorProvider; - -import io.netty.channel.socket.InternetProtocolFamily; - -/** - * For Java7+ only! - */ -public -class NioServerDatagramChannel7 { - // NOTE: this is suppressed because we compile this for java7, and everything else for java6, and this is only called if we are java7+ - @SuppressWarnings("Since15") - public static - DatagramChannel newSocket(final SelectorProvider provider, final InternetProtocolFamily ipFamily) throws IOException { - return provider.openDatagramChannel(ProtocolFamilyConverter.convert(ipFamily)); - } -} diff --git a/src/io/netty/channel/socket/nio/NioServerDatagramChannelConfig.java b/src/io/netty/channel/socket/nio/NioServerDatagramChannelConfig.java deleted file mode 100644 index d37e469e..00000000 --- a/src/io/netty/channel/socket/nio/NioServerDatagramChannelConfig.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright 2018 dorkbox, llc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.channel.socket.nio; - -import java.net.SocketException; -import java.nio.channels.DatagramChannel; - -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.*; -import io.netty.channel.socket.ServerSocketChannelConfig; - -/** - * This is a basic implementation of a ChannelConfig, with the exception that we take in a DatagramSessionChannel, and modify only the - * options of that channel that make sense - */ -public final -class NioServerDatagramChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { - private final DatagramChannel datagramChannel; - - public - NioServerDatagramChannelConfig(NioServerDatagramChannel channel, DatagramChannel datagramChannel) { - super(channel); - this.datagramChannel = datagramChannel; - } - - @Override - public - int getBacklog() { - return 1; - } - - @Override - public - ServerSocketChannelConfig setBacklog(final int backlog) { - return this; - } - - @Override - public - boolean isReuseAddress() { - try { - return datagramChannel.socket() - .getReuseAddress(); - } catch (SocketException e) { - throw new ChannelException(e); - } - } - - @Override - public - ServerSocketChannelConfig setReuseAddress(final boolean reuseAddress) { - try { - datagramChannel.socket() - .setReuseAddress(true); - } catch (SocketException e) { - throw new ChannelException(e); - } - - return this; - } - - @Override - public - int getReceiveBufferSize() { - try { - return datagramChannel.socket() - .getReceiveBufferSize(); - } catch (SocketException e) { - throw new ChannelException(e); - } - } - - @Override - public - ServerSocketChannelConfig setReceiveBufferSize(final int receiveBufferSize) { - try { - datagramChannel.socket() - .setReceiveBufferSize(receiveBufferSize); - } catch (SocketException e) { - throw new ChannelException(e); - } - - return this; - } - - @Override - public - ServerSocketChannelConfig setPerformancePreferences(final int connectionTime, final int latency, final int bandwidth) { - return this; - } - - @Override - public - ServerSocketChannelConfig setConnectTimeoutMillis(int timeout) { - return this; - } - - @Override - @Deprecated - public - ServerSocketChannelConfig setMaxMessagesPerRead(int n) { - super.setMaxMessagesPerRead(n); - return this; - } - - @Override - public - ServerSocketChannelConfig setWriteSpinCount(int spincount) { - super.setWriteSpinCount(spincount); - return this; - } - - @Override - public - ServerSocketChannelConfig setAllocator(ByteBufAllocator alloc) { - super.setAllocator(alloc); - return this; - } - - @Override - public - ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator alloc) { - super.setRecvByteBufAllocator(alloc); - return this; - } - - @Override - public - ServerSocketChannelConfig setAutoRead(boolean autoread) { - super.setAutoRead(autoread); - return this; - } - - @Override - public - ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { - return (ServerSocketChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); - } - - @Override - public - ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { - return (ServerSocketChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); - } - - @Override - public - ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { - return (ServerSocketChannelConfig) super.setWriteBufferWaterMark(writeBufferWaterMark); - } - - @Override - public - ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator est) { - super.setMessageSizeEstimator(est); - return this; - } -}