From 1cf4457f7727a768b66a4592d6eed4c5c741a47b Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 8 May 2020 16:29:59 +0200 Subject: [PATCH] Updated version --- src/dorkbox/network/Broadcast.java | 662 +++++++++--------- src/dorkbox/network/Client.java | 1030 ++++++++++++++-------------- src/dorkbox/network/Server.java | 970 +++++++++++++------------- 3 files changed, 1331 insertions(+), 1331 deletions(-) diff --git a/src/dorkbox/network/Broadcast.java b/src/dorkbox/network/Broadcast.java index 1595ff33..896183f6 100644 --- a/src/dorkbox/network/Broadcast.java +++ b/src/dorkbox/network/Broadcast.java @@ -1,331 +1,331 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network; - -import java.io.IOException; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.InterfaceAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; - -import dorkbox.network.pipeline.MagicBytes; -import dorkbox.network.pipeline.discovery.BroadcastResponse; -import dorkbox.network.pipeline.discovery.ClientDiscoverHostHandler; -import dorkbox.network.pipeline.discovery.ClientDiscoverHostInitializer; -import dorkbox.util.OS; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.oio.OioEventLoopGroup; -import io.netty.channel.socket.DatagramPacket; -import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.oio.OioDatagramChannel; - -@SuppressWarnings({"unused", "AutoBoxing"}) -public final -class Broadcast { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Client.class.getSimpleName()); - - /** - * Gets the version number. - */ - public static - String getVersion() { - return "4.0"; - } - - /** - * Broadcasts a UDP message on the LAN to discover any running servers. The address of the first server to respond is returned. - *

- * From KryoNet - * - * @param udpPort - * The UDP port of the server. - * @param discoverTimeoutMillis - * The number of milliseconds to wait for a response. - * - * @return the first server found, or null if no server responded. - */ - public static - BroadcastResponse discoverHost(int udpPort, int discoverTimeoutMillis) throws IOException { - BroadcastResponse discoverHost = discoverHostAddress(udpPort, discoverTimeoutMillis); - if (discoverHost != null) { - return discoverHost; - } - return null; - } - - /** - * Broadcasts a UDP message on the LAN to discover any running servers. The address of the first server to respond is returned. - * - * @param udpPort - * The UDP port of the server. - * @param discoverTimeoutMillis - * The number of milliseconds to wait for a response. - * - * @return the first server found, or null if no server responded. - */ - public static - BroadcastResponse discoverHostAddress(int udpPort, int discoverTimeoutMillis) throws IOException { - List servers = discoverHosts0(logger, udpPort, discoverTimeoutMillis, false); - if (servers.isEmpty()) { - return null; - } - else { - return servers.get(0); - } - } - - /** - * Broadcasts a UDP message on the LAN to discover all running servers. - * - * @param udpPort - * The UDP port of the server. - * @param discoverTimeoutMillis - * The number of milliseconds to wait for a response. - * - * @return the list of found servers (if they responded) - */ - public static - List discoverHosts(int udpPort, int discoverTimeoutMillis) throws IOException { - return discoverHosts0(logger, udpPort, discoverTimeoutMillis, true); - } - - - static - List discoverHosts0(Logger logger, int udpPort, int discoverTimeoutMillis, boolean fetchAllServers) throws IOException { - // fetch a buffer that contains the serialized object. - ByteBuf buffer = Unpooled.wrappedBuffer(new byte[]{MagicBytes.broadcastID}); - - List servers = new ArrayList(); - - Enumeration networkInterfaces; - try { - networkInterfaces = NetworkInterface.getNetworkInterfaces(); - } catch (SocketException e) { - if (logger != null) { - logger.error("Host discovery failed.", e); - } - throw new IOException("Host discovery failed. No interfaces found."); - } - - - scan: - for (NetworkInterface networkInterface : Collections.list(networkInterfaces)) { - for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) { - InetAddress address = interfaceAddress.getAddress(); - InetAddress broadcast = interfaceAddress.getBroadcast(); - - // don't use IPv6! - if (address instanceof Inet6Address) { - if (logger != null) { - if (logger.isInfoEnabled()) { - logger.info("Not using IPv6 address: {}", address); - } - } - continue; - } - - - try { - if (logger != null) { - if (logger.isInfoEnabled()) { - logger.info("Searching for host on [{}:{}]", address.getHostAddress(), udpPort); - } - } - - EventLoopGroup group; - Class channelClass; - - if (OS.isAndroid()) { - // android ONLY supports OIO (not NIO) - group = new OioEventLoopGroup(1); - channelClass = OioDatagramChannel.class; - } else { - group = new NioEventLoopGroup(1); - channelClass = NioDatagramChannel.class; - } - - - Bootstrap udpBootstrap = new Bootstrap().group(group) - .channel(channelClass) - .option(ChannelOption.SO_BROADCAST, true) - .handler(new ClientDiscoverHostInitializer()) - .localAddress(new InetSocketAddress(address, - 0)); // pick random address. Not listen for broadcast. - - // we don't care about RECEIVING a broadcast packet, we are only SENDING one. - ChannelFuture future; - try { - future = udpBootstrap.bind(); - future.await(); - } catch (InterruptedException e) { - if (logger != null) { - logger.error("Could not bind to random UDP address on the server.", e.getCause()); - } - throw new IOException("Could not bind to random UDP address on the server."); - } - - if (!future.isSuccess()) { - if (logger != null) { - logger.error("Could not bind to random UDP address on the server.", future.cause()); - } - throw new IOException("Could not bind to random UDP address on the server."); - } - - Channel channel1 = future.channel(); - - if (broadcast != null) { - // try the "defined" broadcast first if we have it (not always!) - channel1.writeAndFlush(new DatagramPacket(buffer, new InetSocketAddress(broadcast, udpPort))); - - // response is received. If the channel is not closed within 5 seconds, move to the next one. - if (!channel1.closeFuture().awaitUninterruptibly(discoverTimeoutMillis)) { - if (logger != null) { - if (logger.isInfoEnabled()) { - logger.info("Host discovery timed out."); - } - } - } - else { - BroadcastResponse broadcastResponse = channel1.attr(ClientDiscoverHostHandler.STATE).get(); - servers.add(broadcastResponse); - } - - - // keep going if we want to fetch all servers. Break if we found one. - if (!(fetchAllServers || servers.isEmpty())) { - channel1.close().await(); - group.shutdownGracefully().await(); - break scan; - } - } - - // continue with "common" broadcast addresses. - // Java 1.5 doesn't support getting the subnet mask, so try them until we find one. - - byte[] ip = address.getAddress(); - for (int octect = 3; octect >= 0; octect--) { - ip[octect] = -1; // 255.255.255.0 - - // don't error out on one particular octect - try { - InetAddress byAddress = InetAddress.getByAddress(ip); - channel1.writeAndFlush(new DatagramPacket(buffer, new InetSocketAddress(byAddress, udpPort))); - - - // response is received. If the channel is not closed within 5 seconds, move to the next one. - if (!channel1.closeFuture().awaitUninterruptibly(discoverTimeoutMillis)) { - if (logger != null) { - if (logger.isInfoEnabled()) { - logger.info("Host discovery timed out."); - } - } - } - else { - BroadcastResponse broadcastResponse = channel1.attr(ClientDiscoverHostHandler.STATE).get(); - servers.add(broadcastResponse); - - if (!fetchAllServers) { - break; - } - } - } catch (Exception ignored) { - } - } - - channel1.close().sync(); - group.shutdownGracefully(0, discoverTimeoutMillis, TimeUnit.MILLISECONDS); - - } catch (Exception ignored) { - } - - // keep going if we want to fetch all servers. Break if we found one. - if (!(fetchAllServers || servers.isEmpty())) { - break scan; - } - } - } - - - if (logger != null && logger.isInfoEnabled() && !servers.isEmpty()) { - StringBuilder stringBuilder = new StringBuilder(256); - - if (fetchAllServers) { - stringBuilder.append("Discovered servers: (") - .append(servers.size()) - .append(")"); - - for (BroadcastResponse server : servers) { - stringBuilder.append("/n") - .append(server.remoteAddress) - .append(":"); - - if (server.tcpPort > 0) { - stringBuilder.append(server.tcpPort); - - if (server.udpPort > 0) { - stringBuilder.append(":"); - } - } - if (server.udpPort > 0) { - stringBuilder.append(udpPort); - } - } - logger.info(stringBuilder.toString()); - } - else { - BroadcastResponse server = servers.get(0); - stringBuilder.append(server.remoteAddress) - .append(":"); - - if (server.tcpPort > 0) { - stringBuilder.append(server.tcpPort); - - if (server.udpPort > 0) { - stringBuilder.append(":"); - } - } - if (server.udpPort > 0) { - stringBuilder.append(udpPort); - } - - logger.info("Discovered server [{}]", stringBuilder.toString()); - } - } - - return servers; - } - - private - Broadcast() { - } -} - +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; + +import dorkbox.network.pipeline.MagicBytes; +import dorkbox.network.pipeline.discovery.BroadcastResponse; +import dorkbox.network.pipeline.discovery.ClientDiscoverHostHandler; +import dorkbox.network.pipeline.discovery.ClientDiscoverHostInitializer; +import dorkbox.util.OS; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.oio.OioDatagramChannel; + +@SuppressWarnings({"unused", "AutoBoxing"}) +public final +class Broadcast { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Client.class.getSimpleName()); + + /** + * Gets the version number. + */ + public static + String getVersion() { + return "4.1"; + } + + /** + * Broadcasts a UDP message on the LAN to discover any running servers. The address of the first server to respond is returned. + *

+ * From KryoNet + * + * @param udpPort + * The UDP port of the server. + * @param discoverTimeoutMillis + * The number of milliseconds to wait for a response. + * + * @return the first server found, or null if no server responded. + */ + public static + BroadcastResponse discoverHost(int udpPort, int discoverTimeoutMillis) throws IOException { + BroadcastResponse discoverHost = discoverHostAddress(udpPort, discoverTimeoutMillis); + if (discoverHost != null) { + return discoverHost; + } + return null; + } + + /** + * Broadcasts a UDP message on the LAN to discover any running servers. The address of the first server to respond is returned. + * + * @param udpPort + * The UDP port of the server. + * @param discoverTimeoutMillis + * The number of milliseconds to wait for a response. + * + * @return the first server found, or null if no server responded. + */ + public static + BroadcastResponse discoverHostAddress(int udpPort, int discoverTimeoutMillis) throws IOException { + List servers = discoverHosts0(logger, udpPort, discoverTimeoutMillis, false); + if (servers.isEmpty()) { + return null; + } + else { + return servers.get(0); + } + } + + /** + * Broadcasts a UDP message on the LAN to discover all running servers. + * + * @param udpPort + * The UDP port of the server. + * @param discoverTimeoutMillis + * The number of milliseconds to wait for a response. + * + * @return the list of found servers (if they responded) + */ + public static + List discoverHosts(int udpPort, int discoverTimeoutMillis) throws IOException { + return discoverHosts0(logger, udpPort, discoverTimeoutMillis, true); + } + + + static + List discoverHosts0(Logger logger, int udpPort, int discoverTimeoutMillis, boolean fetchAllServers) throws IOException { + // fetch a buffer that contains the serialized object. + ByteBuf buffer = Unpooled.wrappedBuffer(new byte[]{MagicBytes.broadcastID}); + + List servers = new ArrayList(); + + Enumeration networkInterfaces; + try { + networkInterfaces = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e) { + if (logger != null) { + logger.error("Host discovery failed.", e); + } + throw new IOException("Host discovery failed. No interfaces found."); + } + + + scan: + for (NetworkInterface networkInterface : Collections.list(networkInterfaces)) { + for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) { + InetAddress address = interfaceAddress.getAddress(); + InetAddress broadcast = interfaceAddress.getBroadcast(); + + // don't use IPv6! + if (address instanceof Inet6Address) { + if (logger != null) { + if (logger.isInfoEnabled()) { + logger.info("Not using IPv6 address: {}", address); + } + } + continue; + } + + + try { + if (logger != null) { + if (logger.isInfoEnabled()) { + logger.info("Searching for host on [{}:{}]", address.getHostAddress(), udpPort); + } + } + + EventLoopGroup group; + Class channelClass; + + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + group = new OioEventLoopGroup(1); + channelClass = OioDatagramChannel.class; + } else { + group = new NioEventLoopGroup(1); + channelClass = NioDatagramChannel.class; + } + + + Bootstrap udpBootstrap = new Bootstrap().group(group) + .channel(channelClass) + .option(ChannelOption.SO_BROADCAST, true) + .handler(new ClientDiscoverHostInitializer()) + .localAddress(new InetSocketAddress(address, + 0)); // pick random address. Not listen for broadcast. + + // we don't care about RECEIVING a broadcast packet, we are only SENDING one. + ChannelFuture future; + try { + future = udpBootstrap.bind(); + future.await(); + } catch (InterruptedException e) { + if (logger != null) { + logger.error("Could not bind to random UDP address on the server.", e.getCause()); + } + throw new IOException("Could not bind to random UDP address on the server."); + } + + if (!future.isSuccess()) { + if (logger != null) { + logger.error("Could not bind to random UDP address on the server.", future.cause()); + } + throw new IOException("Could not bind to random UDP address on the server."); + } + + Channel channel1 = future.channel(); + + if (broadcast != null) { + // try the "defined" broadcast first if we have it (not always!) + channel1.writeAndFlush(new DatagramPacket(buffer, new InetSocketAddress(broadcast, udpPort))); + + // response is received. If the channel is not closed within 5 seconds, move to the next one. + if (!channel1.closeFuture().awaitUninterruptibly(discoverTimeoutMillis)) { + if (logger != null) { + if (logger.isInfoEnabled()) { + logger.info("Host discovery timed out."); + } + } + } + else { + BroadcastResponse broadcastResponse = channel1.attr(ClientDiscoverHostHandler.STATE).get(); + servers.add(broadcastResponse); + } + + + // keep going if we want to fetch all servers. Break if we found one. + if (!(fetchAllServers || servers.isEmpty())) { + channel1.close().await(); + group.shutdownGracefully().await(); + break scan; + } + } + + // continue with "common" broadcast addresses. + // Java 1.5 doesn't support getting the subnet mask, so try them until we find one. + + byte[] ip = address.getAddress(); + for (int octect = 3; octect >= 0; octect--) { + ip[octect] = -1; // 255.255.255.0 + + // don't error out on one particular octect + try { + InetAddress byAddress = InetAddress.getByAddress(ip); + channel1.writeAndFlush(new DatagramPacket(buffer, new InetSocketAddress(byAddress, udpPort))); + + + // response is received. If the channel is not closed within 5 seconds, move to the next one. + if (!channel1.closeFuture().awaitUninterruptibly(discoverTimeoutMillis)) { + if (logger != null) { + if (logger.isInfoEnabled()) { + logger.info("Host discovery timed out."); + } + } + } + else { + BroadcastResponse broadcastResponse = channel1.attr(ClientDiscoverHostHandler.STATE).get(); + servers.add(broadcastResponse); + + if (!fetchAllServers) { + break; + } + } + } catch (Exception ignored) { + } + } + + channel1.close().sync(); + group.shutdownGracefully(0, discoverTimeoutMillis, TimeUnit.MILLISECONDS); + + } catch (Exception ignored) { + } + + // keep going if we want to fetch all servers. Break if we found one. + if (!(fetchAllServers || servers.isEmpty())) { + break scan; + } + } + } + + + if (logger != null && logger.isInfoEnabled() && !servers.isEmpty()) { + StringBuilder stringBuilder = new StringBuilder(256); + + if (fetchAllServers) { + stringBuilder.append("Discovered servers: (") + .append(servers.size()) + .append(")"); + + for (BroadcastResponse server : servers) { + stringBuilder.append("/n") + .append(server.remoteAddress) + .append(":"); + + if (server.tcpPort > 0) { + stringBuilder.append(server.tcpPort); + + if (server.udpPort > 0) { + stringBuilder.append(":"); + } + } + if (server.udpPort > 0) { + stringBuilder.append(udpPort); + } + } + logger.info(stringBuilder.toString()); + } + else { + BroadcastResponse server = servers.get(0); + stringBuilder.append(server.remoteAddress) + .append(":"); + + if (server.tcpPort > 0) { + stringBuilder.append(server.tcpPort); + + if (server.udpPort > 0) { + stringBuilder.append(":"); + } + } + if (server.udpPort > 0) { + stringBuilder.append(udpPort); + } + + logger.info("Discovered server [{}]", stringBuilder.toString()); + } + } + + return servers; + } + + private + Broadcast() { + } +} + diff --git a/src/dorkbox/network/Client.java b/src/dorkbox/network/Client.java index 276443e6..c1b7ca6b 100644 --- a/src/dorkbox/network/Client.java +++ b/src/dorkbox/network/Client.java @@ -1,515 +1,515 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network; - -import static dorkbox.network.pipeline.ConnectionType.LOCAL; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import dorkbox.network.connection.BootstrapWrapper; -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.EndPointClient; -import dorkbox.network.connection.RegistrationWrapperClient; -import dorkbox.network.connection.idle.IdleBridge; -import dorkbox.network.connection.idle.IdleSender; -import dorkbox.network.connection.registration.local.RegistrationLocalHandlerClient; -import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientTCP; -import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientUDP; -import dorkbox.network.rmi.RemoteObject; -import dorkbox.network.rmi.RemoteObjectCallback; -import dorkbox.network.rmi.TimeoutException; -import dorkbox.util.OS; -import dorkbox.util.exceptions.SecurityException; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.EpollDatagramChannel; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.kqueue.KQueueDatagramChannel; -import io.netty.channel.kqueue.KQueueSocketChannel; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.channel.socket.oio.OioDatagramChannel; -import io.netty.channel.socket.oio.OioSocketChannel; - -/** - * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's - * ASYNC. - */ -@SuppressWarnings({"unused", "WeakerAccess"}) -public -class Client extends EndPointClient implements Connection { - /** - * Gets the version number. - */ - public static - String getVersion() { - return "4.0"; - } - - private final String localChannelName; - private final String hostName; - private Configuration config; - - /** - * Starts a LOCAL only client, with the default local channel name and serialization scheme - */ - public - Client() throws SecurityException { - this(Configuration.localOnly()); - } - - /** - * Starts a TCP & UDP client (or a LOCAL client), with the specified serialization scheme - */ - public - Client(String host, int tcpPort, int udpPort, String localChannelName) throws SecurityException { - this(new Configuration(host, tcpPort, udpPort, localChannelName)); - } - - /** - * Starts a REMOTE only client, which will connect to the specified host using the specified Connections Options - */ - @SuppressWarnings("AutoBoxing") - public - Client(final Configuration config) throws SecurityException { - super(config); - - String threadName = Client.class.getSimpleName(); - - this.config = config; - boolean hostConfigured = (config.tcpPort > 0 || config.udpPort > 0) && config.host != null; - boolean isLocalChannel = config.localChannelName != null; - - if (isLocalChannel && hostConfigured) { - String msg = threadName + " Local channel use and TCP/UDP use are MUTUALLY exclusive. Unable to determine what to do."; - logger.error(msg); - throw new IllegalArgumentException(msg); - } - - localChannelName = config.localChannelName; - hostName = config.host; - - Bootstrap localBootstrap = null; - Bootstrap tcpBootstrap = null; - Bootstrap udpBootstrap = null; - - - if (config.localChannelName != null) { - localBootstrap = new Bootstrap(); - } - - if (config.tcpPort > 0 || config.udpPort > 0) { - if (config.host == null) { - throw new IllegalArgumentException("You must define what host you want to connect to."); - } - - if (config.host.equals("0.0.0.0")) { - throw new IllegalArgumentException("You cannot connect to 0.0.0.0, you must define what host you want to connect to."); - } - } - - if (config.tcpPort > 0) { - tcpBootstrap = new Bootstrap(); - } - - if (config.udpPort > 0) { - udpBootstrap = new Bootstrap(); - } - - if (localBootstrap == null && tcpBootstrap == null && udpBootstrap == null) { - throw new IllegalArgumentException("You must define how you want to connect, either LOCAL channel name, TCP port, or UDP port"); - } - - - - if (config.localChannelName != null) { - // no networked bootstraps. LOCAL connection only - - bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap)); - - localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS")) - .channel(LocalChannel.class) - .remoteAddress(new LocalAddress(config.localChannelName)) - .handler(new RegistrationLocalHandlerClient(threadName, (RegistrationWrapperClient) registrationWrapper)); - } - - - EventLoopGroup workerEventLoop = null; - if (tcpBootstrap != null || udpBootstrap != null) { - workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName); - } - - - if (tcpBootstrap != null) { - bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap)); - - if (OS.isAndroid()) { - // android ONLY supports OIO (not NIO) - tcpBootstrap.channel(OioSocketChannel.class); - } - else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // epoll network stack is MUCH faster (but only on linux) - tcpBootstrap.channel(EpollSocketChannel.class); - } - else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // KQueue network stack is MUCH faster (but only on macosx) - tcpBootstrap.channel(KQueueSocketChannel.class); - } - else { - tcpBootstrap.channel(NioSocketChannel.class); - } - - tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS")) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - .remoteAddress(config.host, config.tcpPort) - .handler(new RegistrationRemoteHandlerClientTCP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop)); - - // android screws up on this!! - tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()) - .option(ChannelOption.SO_KEEPALIVE, true); - } - - - if (udpBootstrap != null) { - bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap)); - - if (OS.isAndroid()) { - // android ONLY supports OIO (not NIO) - udpBootstrap.channel(OioDatagramChannel.class); - } - else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // epoll network stack is MUCH faster (but only on linux) - udpBootstrap.channel(EpollDatagramChannel.class); - } - else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // KQueue network stack is MUCH faster (but only on macosx) - udpBootstrap.channel(KQueueDatagramChannel.class); - } - else { - udpBootstrap.channel(NioDatagramChannel.class); - } - - udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS")) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - // Netty4 has a default of 2048 bytes as upper limit for datagram packets. - .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize)) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - .localAddress(new InetSocketAddress(0)) // bind to wildcard - .remoteAddress(new InetSocketAddress(config.host, config.udpPort)) - .handler(new RegistrationRemoteHandlerClientUDP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop)); - - // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0) - // in order to WRITE: write as normal, just make sure it ends in .255 - // in order to LISTEN: - // InetAddress group = InetAddress.getByName("203.0.113.0"); - // NioDatagramChannel.joinGroup(group); - // THEN once done - // NioDatagramChannel.leaveGroup(group), close the socket - udpBootstrap.option(ChannelOption.SO_BROADCAST, false) - .option(ChannelOption.SO_SNDBUF, udpMaxSize); - } - } - - /** - * Allows the client to reconnect to the last connected server - * - * @throws IOException - * if the client is unable to reconnect in the previously requested connection-timeout - */ - public - void reconnect() throws IOException { - reconnect(connectionTimeout); - } - - /** - * Allows the client to reconnect to the last connected server - * - * @throws IOException - * if the client is unable to reconnect in the requested time - */ - public - void reconnect(final int connectionTimeout) throws IOException { - // make sure we are closed first - close(); - - connect(connectionTimeout); - } - - - /** - * will attempt to connect to the server, with a 30 second timeout. - * - * @throws IOException - * if the client is unable to connect in 30 seconds - */ - public - void connect() throws IOException { - connect(30000); - } - - /** - * will attempt to connect to the server, and will the specified timeout. - *

- * will BLOCK until completed - * - * @param connectionTimeout - * wait for x milliseconds. 0 will wait indefinitely - * - * @throws IOException - * if the client is unable to connect in the requested time - */ - public - void connect(final int connectionTimeout) throws IOException { - this.connectionTimeout = connectionTimeout; - - // make sure we are not trying to connect during a close or stop event. - // This will wait until we have finished shutting down. - synchronized (shutdownInProgress) { - } - - // if we are in the SAME thread as netty -- start in a new thread (otherwise we will deadlock) - if (isNettyThread()) { - runNewThread("Restart Thread", new Runnable(){ - @Override - public - void run() { - try { - connect(connectionTimeout); - } catch (IOException e) { - e.printStackTrace(); - } - } - }); - - return; - } - - - - if (isShutdown()) { - throw new IOException("Unable to connect when shutdown..."); - } - - if (localChannelName != null) { - logger.info("Connecting to local server: {}", localChannelName); - } - else { - if (config.tcpPort > 0 && config.udpPort > 0) { - logger.info("Connecting to TCP/UDP server [{}:{}]", hostName, config.tcpPort, config.udpPort); - } - else if (config.tcpPort > 0) { - logger.info("Connecting to TCP server [{}:{}]", hostName, config.tcpPort); - } - else { - logger.info("Connecting to UDP server [{}:{}]", hostName, config.udpPort); - } - } - - // have to start the registration process. This will wait until registration is complete and RMI methods are initialized - // if this is called in the event dispatch thread for netty, it will deadlock! - startRegistration(); - - - if (config.tcpPort == 0 && config.udpPort > 0) { - // AFTER registration is complete, if we are UDP only -- setup a heartbeat (must be the larger of 2x the idle timeout OR 10 seconds) - startUdpHeartbeat(); - } - } - - @Override - public - boolean hasRemoteKeyChanged() { - return connection.hasRemoteKeyChanged(); - } - - /** - * @return the remote address, as a string. - */ - @Override - public - String getRemoteHost() { - return connection.getRemoteHost(); - } - - /** - * @return true if this connection is established on the loopback interface - */ - @Override - public - boolean isLoopback() { - return connection.isLoopback(); - } - - /** - * @return the connection (TCP or LOCAL) id of this connection. - */ - @Override - public - int id() { - return connection.id(); - } - - /** - * @return the connection (TCP or LOCAL) id of this connection as a HEX string. - */ - @Override - public - String idAsHex() { - return connection.idAsHex(); - } - - @Override - public - boolean hasUDP() { - return connection.hasUDP(); - } - - /** - * Expose methods to send objects to a destination when the connection has become idle. - */ - @Override - public - IdleBridge sendOnIdle(IdleSender sender) { - return connection.sendOnIdle(sender); - } - - /** - * Expose methods to send objects to a destination when the connection has become idle. - */ - @Override - public - IdleBridge sendOnIdle(Object message) { - return connection.sendOnIdle(message); - } - - /** - * Marks the connection to be closed as soon as possible. This is evaluated when the current - * thread execution returns to the network stack. - */ - @Override - public - void closeAsap() { - connection.closeAsap(); - } - - /** - * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" - * to an object that is created remotely. - *

- * The callback will be notified when the remote object has been created. - *

- *

- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the - * {@link RemoteObject#setResponseTimeout(int) response timeout}. - *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must - * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a - * void return value can be called on the update thread. - *

- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side - * will have the proxy object replaced with the registered (non-proxy) object. - *

- * If one wishes to change the default behavior, cast the object to access the different methods. - * ie: `RemoteObject remoteObject = (RemoteObject) test;` - * - * @see RemoteObject - */ - @Override - public - void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) { - try { - connection.createRemoteObject(interfaceClass, callback); - } catch (NullPointerException e) { - logger.error("Error creating remote object!", e); - } - } - - /** - * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" - * to an object that is created remotely. - *

- * The callback will be notified when the remote object has been created. - *

- *

- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the - * {@link RemoteObject#setResponseTimeout(int) response timeout}. - *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must - * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a - * void return value can be called on the update thread. - *

- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side - * will have the proxy object replaced with the registered (non-proxy) object. - *

- * If one wishes to change the default behavior, cast the object to access the different methods. - * ie: `RemoteObject remoteObject = (RemoteObject) test;` - * - * @see RemoteObject - */ - @Override - public - void getRemoteObject(final int objectId, final RemoteObjectCallback callback) { - try { - connection.getRemoteObject(objectId, callback); - } catch (NullPointerException e) { - logger.error("Error getting remote object!", e); - } - } - - /** - * Fetches the connection used by the client. - *

- * Make sure that you only call this after the client connects! - *

- * This is preferred to {@link EndPoint#getConnections()}, as it properly does some error checking - */ - @SuppressWarnings("unchecked") - public - C getConnection() { - return (C) connection; - } - - /** - * Closes all connections ONLY (keeps the client running), does not remove any listeners. To STOP the client, use stop(). - *

- * This is used, for example, when reconnecting to a server. - */ - @Override - public - void close() { - closeConnection(); - } - - /** - * Checks to see if this client has connected yet or not. - * - * @return true if we are connected, false otherwise. - */ - public - boolean isConnected() { - return super.isConnected.get(); - } -} - +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network; + +import static dorkbox.network.pipeline.ConnectionType.LOCAL; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import dorkbox.network.connection.BootstrapWrapper; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointClient; +import dorkbox.network.connection.RegistrationWrapperClient; +import dorkbox.network.connection.idle.IdleBridge; +import dorkbox.network.connection.idle.IdleSender; +import dorkbox.network.connection.registration.local.RegistrationLocalHandlerClient; +import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientTCP; +import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientUDP; +import dorkbox.network.rmi.RemoteObject; +import dorkbox.network.rmi.RemoteObjectCallback; +import dorkbox.network.rmi.TimeoutException; +import dorkbox.util.OS; +import dorkbox.util.exceptions.SecurityException; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.kqueue.KQueueDatagramChannel; +import io.netty.channel.kqueue.KQueueSocketChannel; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.oio.OioDatagramChannel; +import io.netty.channel.socket.oio.OioSocketChannel; + +/** + * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's + * ASYNC. + */ +@SuppressWarnings({"unused", "WeakerAccess"}) +public +class Client extends EndPointClient implements Connection { + /** + * Gets the version number. + */ + public static + String getVersion() { + return "4.1"; + } + + private final String localChannelName; + private final String hostName; + private Configuration config; + + /** + * Starts a LOCAL only client, with the default local channel name and serialization scheme + */ + public + Client() throws SecurityException { + this(Configuration.localOnly()); + } + + /** + * Starts a TCP & UDP client (or a LOCAL client), with the specified serialization scheme + */ + public + Client(String host, int tcpPort, int udpPort, String localChannelName) throws SecurityException { + this(new Configuration(host, tcpPort, udpPort, localChannelName)); + } + + /** + * Starts a REMOTE only client, which will connect to the specified host using the specified Connections Options + */ + @SuppressWarnings("AutoBoxing") + public + Client(final Configuration config) throws SecurityException { + super(config); + + String threadName = Client.class.getSimpleName(); + + this.config = config; + boolean hostConfigured = (config.tcpPort > 0 || config.udpPort > 0) && config.host != null; + boolean isLocalChannel = config.localChannelName != null; + + if (isLocalChannel && hostConfigured) { + String msg = threadName + " Local channel use and TCP/UDP use are MUTUALLY exclusive. Unable to determine what to do."; + logger.error(msg); + throw new IllegalArgumentException(msg); + } + + localChannelName = config.localChannelName; + hostName = config.host; + + Bootstrap localBootstrap = null; + Bootstrap tcpBootstrap = null; + Bootstrap udpBootstrap = null; + + + if (config.localChannelName != null) { + localBootstrap = new Bootstrap(); + } + + if (config.tcpPort > 0 || config.udpPort > 0) { + if (config.host == null) { + throw new IllegalArgumentException("You must define what host you want to connect to."); + } + + if (config.host.equals("0.0.0.0")) { + throw new IllegalArgumentException("You cannot connect to 0.0.0.0, you must define what host you want to connect to."); + } + } + + if (config.tcpPort > 0) { + tcpBootstrap = new Bootstrap(); + } + + if (config.udpPort > 0) { + udpBootstrap = new Bootstrap(); + } + + if (localBootstrap == null && tcpBootstrap == null && udpBootstrap == null) { + throw new IllegalArgumentException("You must define how you want to connect, either LOCAL channel name, TCP port, or UDP port"); + } + + + + if (config.localChannelName != null) { + // no networked bootstraps. LOCAL connection only + + bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap)); + + localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS")) + .channel(LocalChannel.class) + .remoteAddress(new LocalAddress(config.localChannelName)) + .handler(new RegistrationLocalHandlerClient(threadName, (RegistrationWrapperClient) registrationWrapper)); + } + + + EventLoopGroup workerEventLoop = null; + if (tcpBootstrap != null || udpBootstrap != null) { + workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName); + } + + + if (tcpBootstrap != null) { + bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap)); + + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + tcpBootstrap.channel(OioSocketChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // epoll network stack is MUCH faster (but only on linux) + tcpBootstrap.channel(EpollSocketChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + tcpBootstrap.channel(KQueueSocketChannel.class); + } + else { + tcpBootstrap.channel(NioSocketChannel.class); + } + + tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS")) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) + .remoteAddress(config.host, config.tcpPort) + .handler(new RegistrationRemoteHandlerClientTCP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop)); + + // android screws up on this!! + tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()) + .option(ChannelOption.SO_KEEPALIVE, true); + } + + + if (udpBootstrap != null) { + bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap)); + + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + udpBootstrap.channel(OioDatagramChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // epoll network stack is MUCH faster (but only on linux) + udpBootstrap.channel(EpollDatagramChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + udpBootstrap.channel(KQueueDatagramChannel.class); + } + else { + udpBootstrap.channel(NioDatagramChannel.class); + } + + udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS")) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + // Netty4 has a default of 2048 bytes as upper limit for datagram packets. + .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize)) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) + .localAddress(new InetSocketAddress(0)) // bind to wildcard + .remoteAddress(new InetSocketAddress(config.host, config.udpPort)) + .handler(new RegistrationRemoteHandlerClientUDP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop)); + + // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0) + // in order to WRITE: write as normal, just make sure it ends in .255 + // in order to LISTEN: + // InetAddress group = InetAddress.getByName("203.0.113.0"); + // NioDatagramChannel.joinGroup(group); + // THEN once done + // NioDatagramChannel.leaveGroup(group), close the socket + udpBootstrap.option(ChannelOption.SO_BROADCAST, false) + .option(ChannelOption.SO_SNDBUF, udpMaxSize); + } + } + + /** + * Allows the client to reconnect to the last connected server + * + * @throws IOException + * if the client is unable to reconnect in the previously requested connection-timeout + */ + public + void reconnect() throws IOException { + reconnect(connectionTimeout); + } + + /** + * Allows the client to reconnect to the last connected server + * + * @throws IOException + * if the client is unable to reconnect in the requested time + */ + public + void reconnect(final int connectionTimeout) throws IOException { + // make sure we are closed first + close(); + + connect(connectionTimeout); + } + + + /** + * will attempt to connect to the server, with a 30 second timeout. + * + * @throws IOException + * if the client is unable to connect in 30 seconds + */ + public + void connect() throws IOException { + connect(30000); + } + + /** + * will attempt to connect to the server, and will the specified timeout. + *

+ * will BLOCK until completed + * + * @param connectionTimeout + * wait for x milliseconds. 0 will wait indefinitely + * + * @throws IOException + * if the client is unable to connect in the requested time + */ + public + void connect(final int connectionTimeout) throws IOException { + this.connectionTimeout = connectionTimeout; + + // make sure we are not trying to connect during a close or stop event. + // This will wait until we have finished shutting down. + synchronized (shutdownInProgress) { + } + + // if we are in the SAME thread as netty -- start in a new thread (otherwise we will deadlock) + if (isNettyThread()) { + runNewThread("Restart Thread", new Runnable(){ + @Override + public + void run() { + try { + connect(connectionTimeout); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + + return; + } + + + + if (isShutdown()) { + throw new IOException("Unable to connect when shutdown..."); + } + + if (localChannelName != null) { + logger.info("Connecting to local server: {}", localChannelName); + } + else { + if (config.tcpPort > 0 && config.udpPort > 0) { + logger.info("Connecting to TCP/UDP server [{}:{}]", hostName, config.tcpPort, config.udpPort); + } + else if (config.tcpPort > 0) { + logger.info("Connecting to TCP server [{}:{}]", hostName, config.tcpPort); + } + else { + logger.info("Connecting to UDP server [{}:{}]", hostName, config.udpPort); + } + } + + // have to start the registration process. This will wait until registration is complete and RMI methods are initialized + // if this is called in the event dispatch thread for netty, it will deadlock! + startRegistration(); + + + if (config.tcpPort == 0 && config.udpPort > 0) { + // AFTER registration is complete, if we are UDP only -- setup a heartbeat (must be the larger of 2x the idle timeout OR 10 seconds) + startUdpHeartbeat(); + } + } + + @Override + public + boolean hasRemoteKeyChanged() { + return connection.hasRemoteKeyChanged(); + } + + /** + * @return the remote address, as a string. + */ + @Override + public + String getRemoteHost() { + return connection.getRemoteHost(); + } + + /** + * @return true if this connection is established on the loopback interface + */ + @Override + public + boolean isLoopback() { + return connection.isLoopback(); + } + + /** + * @return the connection (TCP or LOCAL) id of this connection. + */ + @Override + public + int id() { + return connection.id(); + } + + /** + * @return the connection (TCP or LOCAL) id of this connection as a HEX string. + */ + @Override + public + String idAsHex() { + return connection.idAsHex(); + } + + @Override + public + boolean hasUDP() { + return connection.hasUDP(); + } + + /** + * Expose methods to send objects to a destination when the connection has become idle. + */ + @Override + public + IdleBridge sendOnIdle(IdleSender sender) { + return connection.sendOnIdle(sender); + } + + /** + * Expose methods to send objects to a destination when the connection has become idle. + */ + @Override + public + IdleBridge sendOnIdle(Object message) { + return connection.sendOnIdle(message); + } + + /** + * Marks the connection to be closed as soon as possible. This is evaluated when the current + * thread execution returns to the network stack. + */ + @Override + public + void closeAsap() { + connection.closeAsap(); + } + + /** + * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" + * to an object that is created remotely. + *

+ * The callback will be notified when the remote object has been created. + *

+ *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the + * {@link RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must + * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a + * void return value can be called on the update thread. + *

+ * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side + * will have the proxy object replaced with the registered (non-proxy) object. + *

+ * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `RemoteObject remoteObject = (RemoteObject) test;` + * + * @see RemoteObject + */ + @Override + public + void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) { + try { + connection.createRemoteObject(interfaceClass, callback); + } catch (NullPointerException e) { + logger.error("Error creating remote object!", e); + } + } + + /** + * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map" + * to an object that is created remotely. + *

+ * The callback will be notified when the remote object has been created. + *

+ *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the + * {@link RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must + * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a + * void return value can be called on the update thread. + *

+ * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side + * will have the proxy object replaced with the registered (non-proxy) object. + *

+ * If one wishes to change the default behavior, cast the object to access the different methods. + * ie: `RemoteObject remoteObject = (RemoteObject) test;` + * + * @see RemoteObject + */ + @Override + public + void getRemoteObject(final int objectId, final RemoteObjectCallback callback) { + try { + connection.getRemoteObject(objectId, callback); + } catch (NullPointerException e) { + logger.error("Error getting remote object!", e); + } + } + + /** + * Fetches the connection used by the client. + *

+ * Make sure that you only call this after the client connects! + *

+ * This is preferred to {@link EndPoint#getConnections()}, as it properly does some error checking + */ + @SuppressWarnings("unchecked") + public + C getConnection() { + return (C) connection; + } + + /** + * Closes all connections ONLY (keeps the client running), does not remove any listeners. To STOP the client, use stop(). + *

+ * This is used, for example, when reconnecting to a server. + */ + @Override + public + void close() { + closeConnection(); + } + + /** + * Checks to see if this client has connected yet or not. + * + * @return true if we are connected, false otherwise. + */ + public + boolean isConnected() { + return super.isConnected.get(); + } +} + diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java index fe7360a4..39097ea8 100644 --- a/src/dorkbox/network/Server.java +++ b/src/dorkbox/network/Server.java @@ -1,485 +1,485 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network; - -import static dorkbox.network.pipeline.ConnectionType.LOCAL; - -import java.io.IOException; -import java.net.Socket; -import java.util.Arrays; -import java.util.List; - -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.EndPointServer; -import dorkbox.network.connection.RegistrationWrapperServer; -import dorkbox.network.connection.connectionType.ConnectionRule; -import dorkbox.network.connection.registration.local.RegistrationLocalHandlerServer; -import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerTCP; -import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerUDP; -import dorkbox.network.pipeline.discovery.BroadcastResponse; -import dorkbox.util.OS; -import dorkbox.util.Property; -import dorkbox.util.exceptions.SecurityException; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.bootstrap.SessionBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.EpollDatagramChannel; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.kqueue.KQueueDatagramChannel; -import io.netty.channel.kqueue.KQueueServerSocketChannel; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalServerChannel; -import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.oio.OioDatagramChannel; -import io.netty.channel.socket.oio.OioServerSocketChannel; -import io.netty.handler.ipfilter.IpFilterRule; -import io.netty.handler.ipfilter.IpFilterRuleType; -import io.netty.handler.ipfilter.IpSubnetFilterRule; -import io.netty.util.NetUtil; - -/** - * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the - * server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections()) - *

- * To put it bluntly, ONLY have the server do work inside of a listener! - */ -public -class Server extends EndPointServer { - - /** - * Rule that will always allow LOCALHOST to connect to the server. This is not added by default - */ - public static final IpFilterRule permitLocalHostRule = new IpSubnetFilterRule(NetUtil.LOCALHOST, 32, IpFilterRuleType.ACCEPT); - - /** - * Gets the version number. - */ - public static - String getVersion() { - return "4.0"; - } - - /** - * The maximum queue length for incoming connection indications (a request to connect). If a connection indication arrives when the - * queue is full, the connection is refused. - */ - @Property - public static int backlogConnectionCount = 50; - - private final ServerBootstrap localBootstrap; - private final ServerBootstrap tcpBootstrap; - private final SessionBootstrap udpBootstrap; - - private final int tcpPort; - private final int udpPort; - - private final String localChannelName; - private final String hostName; - - private volatile boolean isRunning = false; - - - /** - * Starts a LOCAL only server, with the default serialization scheme. - */ - public - Server() throws SecurityException { - this(Configuration.localOnly()); - } - - /** - * Convenience method to starts a server with the specified Connection Options - */ - @SuppressWarnings("AutoBoxing") - public - Server(Configuration config) throws SecurityException { - // watch-out for serialization... it can be NULL incoming. The EndPoint (superclass) sets it, if null, so - // you have to make sure to use this.serialization - super(config); - - tcpPort = config.tcpPort; - udpPort = config.udpPort; - - localChannelName = config.localChannelName; - - if (config.host == null) { - // we set this to "0.0.0.0" so that it is clear that we are trying to bind to that address. - hostName = "0.0.0.0"; - - // make it clear that this is what we do (configuration wise) so that variable examination is consistent - config.host = hostName; - } - else { - hostName = config.host; - } - - - if (localChannelName != null) { - localBootstrap = new ServerBootstrap(); - } - else { - localBootstrap = null; - } - - if (tcpPort > 0) { - tcpBootstrap = new ServerBootstrap(); - } - else { - tcpBootstrap = null; - } - - if (udpPort > 0) { - // This is what allows us to have UDP behave "similar" to TCP, in that a session is established based on the port/ip of the - // remote connection. This allows us to reuse channels and have "state" for a UDP connection that normally wouldn't exist. - // Additionally, this is what responds to discovery broadcast packets - udpBootstrap = new SessionBootstrap(tcpPort, udpPort); - } - else { - udpBootstrap = null; - } - - - String threadName = Server.class.getSimpleName(); - - // always use local channels on the server. - if (localBootstrap != null) { - localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"), - newEventLoop(LOCAL, 1, threadName )) - .channel(LocalServerChannel.class) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - .localAddress(new LocalAddress(localChannelName)) - .childHandler(new RegistrationLocalHandlerServer(threadName, (RegistrationWrapperServer) registrationWrapper)); - } - - - EventLoopGroup workerEventLoop = null; - if (tcpBootstrap != null || udpBootstrap != null) { - workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName); - } - - if (tcpBootstrap != null) { - if (OS.isAndroid()) { - // android ONLY supports OIO (not NIO) - tcpBootstrap.channel(OioServerSocketChannel.class); - } - else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // epoll network stack is MUCH faster (but only on linux) - tcpBootstrap.channel(EpollServerSocketChannel.class); - } - else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // KQueue network stack is MUCH faster (but only on macosx) - tcpBootstrap.channel(KQueueServerSocketChannel.class); - } - else { - tcpBootstrap.channel(NioServerSocketChannel.class); - } - - // TODO: If we use netty for an HTTP server, - // Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server. - - tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"), - newEventLoop(1, threadName + "-TCP-REGISTRATION")) - .option(ChannelOption.SO_BACKLOG, backlogConnectionCount) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop)); - - // have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address! - if (hostName.equals("0.0.0.0")) { - tcpBootstrap.localAddress(tcpPort); - } - else { - tcpBootstrap.localAddress(hostName, tcpPort); - } - - - // android screws up on this!! - tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()) - .childOption(ChannelOption.TCP_NODELAY, !OS.isAndroid()); - } - - - if (udpBootstrap != null) { - if (OS.isAndroid()) { - // android ONLY supports OIO (not NIO) - udpBootstrap.channel(OioDatagramChannel.class); - } - else if (OS.isLinux() && NativeLibrary.isAvailable()) { - // epoll network stack is MUCH faster (but only on linux) - udpBootstrap.channel(EpollDatagramChannel.class); - } - else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { - // KQueue network stack is MUCH faster (but only on macosx) - udpBootstrap.channel(KQueueDatagramChannel.class); - } - else { - // windows and linux/mac that are incompatible with the native implementations - udpBootstrap.channel(NioDatagramChannel.class); - } - - - // Netty4 has a default of 2048 bytes as upper limit for datagram packets, we want this to be whatever we specify - FixedRecvByteBufAllocator recvByteBufAllocator = new FixedRecvByteBufAllocator(EndPoint.udpMaxSize); - - udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"), - newEventLoop(1, threadName + "-UDP-REGISTRATION")) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) - - // a non-root user can't receive a broadcast packet on *nix if the socket is bound on non-wildcard address. - // TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP - // OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets) - .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722 - .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop)); - - // // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address! - // if (hostName.equals("0.0.0.0")) { - // udpBootstrap.localAddress(hostName, tcpPort); - // } - // else { - // udpBootstrap.localAddress(udpPort); - // } - - // Enable to READ from MULTICAST data (ie, 192.168.1.0) - // in order to WRITE: write as normal, just make sure it ends in .255 - // in order to LISTEN: - // InetAddress group = InetAddress.getByName("203.0.113.0"); - // socket.joinGroup(group); - // THEN once done - // socket.leaveGroup(group), close the socket - // Enable to WRITE to MULTICAST data (ie, 192.168.1.0) - udpBootstrap.option(ChannelOption.SO_BROADCAST, false) - .option(ChannelOption.SO_SNDBUF, udpMaxSize); - } - } - - /** - * Binds the server to the configured, underlying protocols. - *

- * This method will also BLOCK until the stop method is called, and if you want to continue running code after this method invocation, - * bind should be called in a separate, non-daemon thread. - */ - public - void bind() { - bind(true); - } - - /** - * Binds the server to the configured, underlying protocols. - *

- * This is a more advanced method, and you should consider calling bind() instead. - * - * @param blockUntilTerminate - * will BLOCK until the server stop method is called, and if you want to continue running code after this method - * invocation, bind should be called in a separate, non-daemon thread - or with false as the parameter. - */ - @SuppressWarnings("AutoBoxing") - public - void bind(boolean blockUntilTerminate) { - // make sure we are not trying to connect during a close or stop event. - // This will wait until we have finished starting up/shutting down. - synchronized (shutdownInProgress) { - } - - - // The bootstraps will be accessed ONE AT A TIME, in this order! - ChannelFuture future; - - // LOCAL - if (localBootstrap != null) { - try { - future = localBootstrap.bind(); - future.await(); - } catch (InterruptedException e) { - throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", e); - } - - if (!future.isSuccess()) { - throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", future.cause()); - } - - logger.info("Listening on LOCAL address: [{}]", localChannelName); - manageForShutdown(future); - } - - - // TCP - if (tcpBootstrap != null) { - // Wait until the connection attempt succeeds or fails. - try { - future = tcpBootstrap.bind(); - future.await(); - } catch (Exception e) { - stop(); - throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", e); - } - - if (!future.isSuccess()) { - stop(); - throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", future.cause()); - } - - logger.info("TCP server listen address [{}:{}]", hostName, tcpPort); - manageForShutdown(future); - } - - // UDP - if (udpBootstrap != null) { - // Wait until the connection attempt succeeds or fails. - try { - future = udpBootstrap.bind(); - future.await(); - } catch (Exception e) { - throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.", e); - } - - if (!future.isSuccess()) { - throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.", - future.cause()); - } - - logger.info("UDP server listen address [{}:{}]", hostName, udpPort); - manageForShutdown(future); - } - - isRunning = true; - - // we now BLOCK until the stop method is called. - // if we want to continue running code in the server, bind should be called in a separate, non-daemon thread. - if (blockUntilTerminate) { - waitForShutdown(); - } - } - - /** - * Adds an IP+subnet rule that defines if that IP+subnet is allowed/denied connectivity to this server. - *

- * If there are any IP+subnet added to this list - then ONLY those are permitted (all else are denied) - *

- * If there is nothing added to this list - then ALL are permitted - */ - public - void addIpFilter(IpFilterRule... rules) { - ipFilterRules.addAll(Arrays.asList(rules)); - } - - /** - * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have. - * - NOTHING : Nothing happens to the in/out bytes - * - COMPRESS: The in/out bytes are compressed with LZ4-fast - * - COMPRESS_AND_ENCRYPT: The in/out bytes are compressed (LZ4-fast) THEN encrypted (AES-256-GCM) - * - * If no rules are defined, then for LOOPBACK, it will always be `COMPRESS` and for everything else it will always be `COMPRESS_AND_ENCRYPT`. - * - * If rules are defined, then everything by default is `COMPRESS_AND_ENCRYPT`. - * - * The compression algorithm is LZ4-fast, so there is a small performance impact for a very large gain - * Compress : 6.210 micros/op; 629.0 MB/s (output: 55.4%) - * Uncompress : 0.641 micros/op; 6097.9 MB/s - */ - public - void addConnectionTypeFilter(ConnectionRule... rules) { - connectionRules.addAll(Arrays.asList(rules)); - } - - // called when we are stopped/shut down - @Override - protected - void stopExtraActions() { - isRunning = false; - - // now WAIT until bind has released the socket - // wait a max of 10 tries - int tries = 10; - while (tries-- > 0 && isRunning(this.config)) { - logger.warn("Server has requested shutdown, but the socket is still bound. Waiting {} more times", tries); - try { - Thread.sleep(2000); - } catch (InterruptedException ignored) { - } - } - } - - /** - * @return true if this server has successfully bound to an IP address and is running - */ - public - boolean isRunning() { - return isRunning; - } - - /** - * Checks to see if a server (using the specified configuration) is running. This will check across JVMs by checking the - * network socket directly, and assumes that if the port is in use and answers, then the server is "running". This does not try to - * authenticate or validate the connection. - *

- * This does not check local-channels (which are intra-JVM only). Uses `Broadcast` to check for UDP servers - *

- * - * @return true if the configuration matches and can connect (but not verify) to the TCP control socket. - */ - public static - boolean isRunning(Configuration config) { - String host = config.host; - - // for us, we want a "null" host to connect to the "any" interface. - if (host == null) { - host = "0.0.0.0"; - } - - if (config.tcpPort > 0) { - Socket sock = null; - // since we check the socket, if we cannot connect to a socket, then we're done. - try { - sock = new Socket(host, config.tcpPort); - // if we can connect to the socket, it means that we are already running. - return sock.isConnected(); - } catch (Exception ignored) { - if (sock != null) { - try { - sock.close(); - } catch (IOException ignored2) { - } - } - } - } - - // use Broadcast to see if there is a UDP server connected - if (config.udpPort > 0) { - List broadcastResponses = null; - try { - broadcastResponses = Broadcast.discoverHosts0(null, config.udpPort, 500, true); - return !broadcastResponses.isEmpty(); - } catch (IOException ignored) { - } - } - - return false; - } -} - +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network; + +import static dorkbox.network.pipeline.ConnectionType.LOCAL; + +import java.io.IOException; +import java.net.Socket; +import java.util.Arrays; +import java.util.List; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointServer; +import dorkbox.network.connection.RegistrationWrapperServer; +import dorkbox.network.connection.connectionType.ConnectionRule; +import dorkbox.network.connection.registration.local.RegistrationLocalHandlerServer; +import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerTCP; +import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerUDP; +import dorkbox.network.pipeline.discovery.BroadcastResponse; +import dorkbox.util.OS; +import dorkbox.util.Property; +import dorkbox.util.exceptions.SecurityException; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.bootstrap.SessionBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.kqueue.KQueueDatagramChannel; +import io.netty.channel.kqueue.KQueueServerSocketChannel; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.oio.OioDatagramChannel; +import io.netty.channel.socket.oio.OioServerSocketChannel; +import io.netty.handler.ipfilter.IpFilterRule; +import io.netty.handler.ipfilter.IpFilterRuleType; +import io.netty.handler.ipfilter.IpSubnetFilterRule; +import io.netty.util.NetUtil; + +/** + * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the + * server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections()) + *

+ * To put it bluntly, ONLY have the server do work inside of a listener! + */ +public +class Server extends EndPointServer { + + /** + * Rule that will always allow LOCALHOST to connect to the server. This is not added by default + */ + public static final IpFilterRule permitLocalHostRule = new IpSubnetFilterRule(NetUtil.LOCALHOST, 32, IpFilterRuleType.ACCEPT); + + /** + * Gets the version number. + */ + public static + String getVersion() { + return "4.1"; + } + + /** + * The maximum queue length for incoming connection indications (a request to connect). If a connection indication arrives when the + * queue is full, the connection is refused. + */ + @Property + public static int backlogConnectionCount = 50; + + private final ServerBootstrap localBootstrap; + private final ServerBootstrap tcpBootstrap; + private final SessionBootstrap udpBootstrap; + + private final int tcpPort; + private final int udpPort; + + private final String localChannelName; + private final String hostName; + + private volatile boolean isRunning = false; + + + /** + * Starts a LOCAL only server, with the default serialization scheme. + */ + public + Server() throws SecurityException { + this(Configuration.localOnly()); + } + + /** + * Convenience method to starts a server with the specified Connection Options + */ + @SuppressWarnings("AutoBoxing") + public + Server(Configuration config) throws SecurityException { + // watch-out for serialization... it can be NULL incoming. The EndPoint (superclass) sets it, if null, so + // you have to make sure to use this.serialization + super(config); + + tcpPort = config.tcpPort; + udpPort = config.udpPort; + + localChannelName = config.localChannelName; + + if (config.host == null) { + // we set this to "0.0.0.0" so that it is clear that we are trying to bind to that address. + hostName = "0.0.0.0"; + + // make it clear that this is what we do (configuration wise) so that variable examination is consistent + config.host = hostName; + } + else { + hostName = config.host; + } + + + if (localChannelName != null) { + localBootstrap = new ServerBootstrap(); + } + else { + localBootstrap = null; + } + + if (tcpPort > 0) { + tcpBootstrap = new ServerBootstrap(); + } + else { + tcpBootstrap = null; + } + + if (udpPort > 0) { + // This is what allows us to have UDP behave "similar" to TCP, in that a session is established based on the port/ip of the + // remote connection. This allows us to reuse channels and have "state" for a UDP connection that normally wouldn't exist. + // Additionally, this is what responds to discovery broadcast packets + udpBootstrap = new SessionBootstrap(tcpPort, udpPort); + } + else { + udpBootstrap = null; + } + + + String threadName = Server.class.getSimpleName(); + + // always use local channels on the server. + if (localBootstrap != null) { + localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"), + newEventLoop(LOCAL, 1, threadName )) + .channel(LocalServerChannel.class) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) + .localAddress(new LocalAddress(localChannelName)) + .childHandler(new RegistrationLocalHandlerServer(threadName, (RegistrationWrapperServer) registrationWrapper)); + } + + + EventLoopGroup workerEventLoop = null; + if (tcpBootstrap != null || udpBootstrap != null) { + workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName); + } + + if (tcpBootstrap != null) { + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + tcpBootstrap.channel(OioServerSocketChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // epoll network stack is MUCH faster (but only on linux) + tcpBootstrap.channel(EpollServerSocketChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + tcpBootstrap.channel(KQueueServerSocketChannel.class); + } + else { + tcpBootstrap.channel(NioServerSocketChannel.class); + } + + // TODO: If we use netty for an HTTP server, + // Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server. + + tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"), + newEventLoop(1, threadName + "-TCP-REGISTRATION")) + .option(ChannelOption.SO_BACKLOG, backlogConnectionCount) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) + + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop)); + + // have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address! + if (hostName.equals("0.0.0.0")) { + tcpBootstrap.localAddress(tcpPort); + } + else { + tcpBootstrap.localAddress(hostName, tcpPort); + } + + + // android screws up on this!! + tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid()) + .childOption(ChannelOption.TCP_NODELAY, !OS.isAndroid()); + } + + + if (udpBootstrap != null) { + if (OS.isAndroid()) { + // android ONLY supports OIO (not NIO) + udpBootstrap.channel(OioDatagramChannel.class); + } + else if (OS.isLinux() && NativeLibrary.isAvailable()) { + // epoll network stack is MUCH faster (but only on linux) + udpBootstrap.channel(EpollDatagramChannel.class); + } + else if (OS.isMacOsX() && NativeLibrary.isAvailable()) { + // KQueue network stack is MUCH faster (but only on macosx) + udpBootstrap.channel(KQueueDatagramChannel.class); + } + else { + // windows and linux/mac that are incompatible with the native implementations + udpBootstrap.channel(NioDatagramChannel.class); + } + + + // Netty4 has a default of 2048 bytes as upper limit for datagram packets, we want this to be whatever we specify + FixedRecvByteBufAllocator recvByteBufAllocator = new FixedRecvByteBufAllocator(EndPoint.udpMaxSize); + + udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"), + newEventLoop(1, threadName + "-UDP-REGISTRATION")) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH)) + + // a non-root user can't receive a broadcast packet on *nix if the socket is bound on non-wildcard address. + // TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP + // OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets) + .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722 + .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop)); + + // // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address! + // if (hostName.equals("0.0.0.0")) { + // udpBootstrap.localAddress(hostName, tcpPort); + // } + // else { + // udpBootstrap.localAddress(udpPort); + // } + + // Enable to READ from MULTICAST data (ie, 192.168.1.0) + // in order to WRITE: write as normal, just make sure it ends in .255 + // in order to LISTEN: + // InetAddress group = InetAddress.getByName("203.0.113.0"); + // socket.joinGroup(group); + // THEN once done + // socket.leaveGroup(group), close the socket + // Enable to WRITE to MULTICAST data (ie, 192.168.1.0) + udpBootstrap.option(ChannelOption.SO_BROADCAST, false) + .option(ChannelOption.SO_SNDBUF, udpMaxSize); + } + } + + /** + * Binds the server to the configured, underlying protocols. + *

+ * This method will also BLOCK until the stop method is called, and if you want to continue running code after this method invocation, + * bind should be called in a separate, non-daemon thread. + */ + public + void bind() { + bind(true); + } + + /** + * Binds the server to the configured, underlying protocols. + *

+ * This is a more advanced method, and you should consider calling bind() instead. + * + * @param blockUntilTerminate + * will BLOCK until the server stop method is called, and if you want to continue running code after this method + * invocation, bind should be called in a separate, non-daemon thread - or with false as the parameter. + */ + @SuppressWarnings("AutoBoxing") + public + void bind(boolean blockUntilTerminate) { + // make sure we are not trying to connect during a close or stop event. + // This will wait until we have finished starting up/shutting down. + synchronized (shutdownInProgress) { + } + + + // The bootstraps will be accessed ONE AT A TIME, in this order! + ChannelFuture future; + + // LOCAL + if (localBootstrap != null) { + try { + future = localBootstrap.bind(); + future.await(); + } catch (InterruptedException e) { + throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", e); + } + + if (!future.isSuccess()) { + throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", future.cause()); + } + + logger.info("Listening on LOCAL address: [{}]", localChannelName); + manageForShutdown(future); + } + + + // TCP + if (tcpBootstrap != null) { + // Wait until the connection attempt succeeds or fails. + try { + future = tcpBootstrap.bind(); + future.await(); + } catch (Exception e) { + stop(); + throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", e); + } + + if (!future.isSuccess()) { + stop(); + throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", future.cause()); + } + + logger.info("TCP server listen address [{}:{}]", hostName, tcpPort); + manageForShutdown(future); + } + + // UDP + if (udpBootstrap != null) { + // Wait until the connection attempt succeeds or fails. + try { + future = udpBootstrap.bind(); + future.await(); + } catch (Exception e) { + throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.", e); + } + + if (!future.isSuccess()) { + throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.", + future.cause()); + } + + logger.info("UDP server listen address [{}:{}]", hostName, udpPort); + manageForShutdown(future); + } + + isRunning = true; + + // we now BLOCK until the stop method is called. + // if we want to continue running code in the server, bind should be called in a separate, non-daemon thread. + if (blockUntilTerminate) { + waitForShutdown(); + } + } + + /** + * Adds an IP+subnet rule that defines if that IP+subnet is allowed/denied connectivity to this server. + *

+ * If there are any IP+subnet added to this list - then ONLY those are permitted (all else are denied) + *

+ * If there is nothing added to this list - then ALL are permitted + */ + public + void addIpFilter(IpFilterRule... rules) { + ipFilterRules.addAll(Arrays.asList(rules)); + } + + /** + * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have. + * - NOTHING : Nothing happens to the in/out bytes + * - COMPRESS: The in/out bytes are compressed with LZ4-fast + * - COMPRESS_AND_ENCRYPT: The in/out bytes are compressed (LZ4-fast) THEN encrypted (AES-256-GCM) + * + * If no rules are defined, then for LOOPBACK, it will always be `COMPRESS` and for everything else it will always be `COMPRESS_AND_ENCRYPT`. + * + * If rules are defined, then everything by default is `COMPRESS_AND_ENCRYPT`. + * + * The compression algorithm is LZ4-fast, so there is a small performance impact for a very large gain + * Compress : 6.210 micros/op; 629.0 MB/s (output: 55.4%) + * Uncompress : 0.641 micros/op; 6097.9 MB/s + */ + public + void addConnectionTypeFilter(ConnectionRule... rules) { + connectionRules.addAll(Arrays.asList(rules)); + } + + // called when we are stopped/shut down + @Override + protected + void stopExtraActions() { + isRunning = false; + + // now WAIT until bind has released the socket + // wait a max of 10 tries + int tries = 10; + while (tries-- > 0 && isRunning(this.config)) { + logger.warn("Server has requested shutdown, but the socket is still bound. Waiting {} more times", tries); + try { + Thread.sleep(2000); + } catch (InterruptedException ignored) { + } + } + } + + /** + * @return true if this server has successfully bound to an IP address and is running + */ + public + boolean isRunning() { + return isRunning; + } + + /** + * Checks to see if a server (using the specified configuration) is running. This will check across JVMs by checking the + * network socket directly, and assumes that if the port is in use and answers, then the server is "running". This does not try to + * authenticate or validate the connection. + *

+ * This does not check local-channels (which are intra-JVM only). Uses `Broadcast` to check for UDP servers + *

+ * + * @return true if the configuration matches and can connect (but not verify) to the TCP control socket. + */ + public static + boolean isRunning(Configuration config) { + String host = config.host; + + // for us, we want a "null" host to connect to the "any" interface. + if (host == null) { + host = "0.0.0.0"; + } + + if (config.tcpPort > 0) { + Socket sock = null; + // since we check the socket, if we cannot connect to a socket, then we're done. + try { + sock = new Socket(host, config.tcpPort); + // if we can connect to the socket, it means that we are already running. + return sock.isConnected(); + } catch (Exception ignored) { + if (sock != null) { + try { + sock.close(); + } catch (IOException ignored2) { + } + } + } + } + + // use Broadcast to see if there is a UDP server connected + if (config.udpPort > 0) { + List broadcastResponses = null; + try { + broadcastResponses = Broadcast.discoverHosts0(null, config.udpPort, 500, true); + return !broadcastResponses.isEmpty(); + } catch (IOException ignored) { + } + } + + return false; + } +} +