extends EndPointClient implements Connection {
- /**
- * Gets the version number.
- */
- public static
- String getVersion() {
- return "4.0";
- }
-
- private final String localChannelName;
- private final String hostName;
- private Configuration config;
-
- /**
- * Starts a LOCAL only client, with the default local channel name and serialization scheme
- */
- public
- Client() throws SecurityException {
- this(Configuration.localOnly());
- }
-
- /**
- * Starts a TCP & UDP client (or a LOCAL client), with the specified serialization scheme
- */
- public
- Client(String host, int tcpPort, int udpPort, String localChannelName) throws SecurityException {
- this(new Configuration(host, tcpPort, udpPort, localChannelName));
- }
-
- /**
- * Starts a REMOTE only client, which will connect to the specified host using the specified Connections Options
- */
- @SuppressWarnings("AutoBoxing")
- public
- Client(final Configuration config) throws SecurityException {
- super(config);
-
- String threadName = Client.class.getSimpleName();
-
- this.config = config;
- boolean hostConfigured = (config.tcpPort > 0 || config.udpPort > 0) && config.host != null;
- boolean isLocalChannel = config.localChannelName != null;
-
- if (isLocalChannel && hostConfigured) {
- String msg = threadName + " Local channel use and TCP/UDP use are MUTUALLY exclusive. Unable to determine what to do.";
- logger.error(msg);
- throw new IllegalArgumentException(msg);
- }
-
- localChannelName = config.localChannelName;
- hostName = config.host;
-
- Bootstrap localBootstrap = null;
- Bootstrap tcpBootstrap = null;
- Bootstrap udpBootstrap = null;
-
-
- if (config.localChannelName != null) {
- localBootstrap = new Bootstrap();
- }
-
- if (config.tcpPort > 0 || config.udpPort > 0) {
- if (config.host == null) {
- throw new IllegalArgumentException("You must define what host you want to connect to.");
- }
-
- if (config.host.equals("0.0.0.0")) {
- throw new IllegalArgumentException("You cannot connect to 0.0.0.0, you must define what host you want to connect to.");
- }
- }
-
- if (config.tcpPort > 0) {
- tcpBootstrap = new Bootstrap();
- }
-
- if (config.udpPort > 0) {
- udpBootstrap = new Bootstrap();
- }
-
- if (localBootstrap == null && tcpBootstrap == null && udpBootstrap == null) {
- throw new IllegalArgumentException("You must define how you want to connect, either LOCAL channel name, TCP port, or UDP port");
- }
-
-
-
- if (config.localChannelName != null) {
- // no networked bootstraps. LOCAL connection only
-
- bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap));
-
- localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"))
- .channel(LocalChannel.class)
- .remoteAddress(new LocalAddress(config.localChannelName))
- .handler(new RegistrationLocalHandlerClient(threadName, (RegistrationWrapperClient) registrationWrapper));
- }
-
-
- EventLoopGroup workerEventLoop = null;
- if (tcpBootstrap != null || udpBootstrap != null) {
- workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName);
- }
-
-
- if (tcpBootstrap != null) {
- bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap));
-
- if (OS.isAndroid()) {
- // android ONLY supports OIO (not NIO)
- tcpBootstrap.channel(OioSocketChannel.class);
- }
- else if (OS.isLinux() && NativeLibrary.isAvailable()) {
- // epoll network stack is MUCH faster (but only on linux)
- tcpBootstrap.channel(EpollSocketChannel.class);
- }
- else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
- // KQueue network stack is MUCH faster (but only on macosx)
- tcpBootstrap.channel(KQueueSocketChannel.class);
- }
- else {
- tcpBootstrap.channel(NioSocketChannel.class);
- }
-
- tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"))
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
- .remoteAddress(config.host, config.tcpPort)
- .handler(new RegistrationRemoteHandlerClientTCP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop));
-
- // android screws up on this!!
- tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
- .option(ChannelOption.SO_KEEPALIVE, true);
- }
-
-
- if (udpBootstrap != null) {
- bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap));
-
- if (OS.isAndroid()) {
- // android ONLY supports OIO (not NIO)
- udpBootstrap.channel(OioDatagramChannel.class);
- }
- else if (OS.isLinux() && NativeLibrary.isAvailable()) {
- // epoll network stack is MUCH faster (but only on linux)
- udpBootstrap.channel(EpollDatagramChannel.class);
- }
- else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
- // KQueue network stack is MUCH faster (but only on macosx)
- udpBootstrap.channel(KQueueDatagramChannel.class);
- }
- else {
- udpBootstrap.channel(NioDatagramChannel.class);
- }
-
- udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"))
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- // Netty4 has a default of 2048 bytes as upper limit for datagram packets.
- .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize))
- .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
- .localAddress(new InetSocketAddress(0)) // bind to wildcard
- .remoteAddress(new InetSocketAddress(config.host, config.udpPort))
- .handler(new RegistrationRemoteHandlerClientUDP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop));
-
- // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0)
- // in order to WRITE: write as normal, just make sure it ends in .255
- // in order to LISTEN:
- // InetAddress group = InetAddress.getByName("203.0.113.0");
- // NioDatagramChannel.joinGroup(group);
- // THEN once done
- // NioDatagramChannel.leaveGroup(group), close the socket
- udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
- .option(ChannelOption.SO_SNDBUF, udpMaxSize);
- }
- }
-
- /**
- * Allows the client to reconnect to the last connected server
- *
- * @throws IOException
- * if the client is unable to reconnect in the previously requested connection-timeout
- */
- public
- void reconnect() throws IOException {
- reconnect(connectionTimeout);
- }
-
- /**
- * Allows the client to reconnect to the last connected server
- *
- * @throws IOException
- * if the client is unable to reconnect in the requested time
- */
- public
- void reconnect(final int connectionTimeout) throws IOException {
- // make sure we are closed first
- close();
-
- connect(connectionTimeout);
- }
-
-
- /**
- * will attempt to connect to the server, with a 30 second timeout.
- *
- * @throws IOException
- * if the client is unable to connect in 30 seconds
- */
- public
- void connect() throws IOException {
- connect(30000);
- }
-
- /**
- * will attempt to connect to the server, and will the specified timeout.
- *
- * will BLOCK until completed
- *
- * @param connectionTimeout
- * wait for x milliseconds. 0 will wait indefinitely
- *
- * @throws IOException
- * if the client is unable to connect in the requested time
- */
- public
- void connect(final int connectionTimeout) throws IOException {
- this.connectionTimeout = connectionTimeout;
-
- // make sure we are not trying to connect during a close or stop event.
- // This will wait until we have finished shutting down.
- synchronized (shutdownInProgress) {
- }
-
- // if we are in the SAME thread as netty -- start in a new thread (otherwise we will deadlock)
- if (isNettyThread()) {
- runNewThread("Restart Thread", new Runnable(){
- @Override
- public
- void run() {
- try {
- connect(connectionTimeout);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
-
- return;
- }
-
-
-
- if (isShutdown()) {
- throw new IOException("Unable to connect when shutdown...");
- }
-
- if (localChannelName != null) {
- logger.info("Connecting to local server: {}", localChannelName);
- }
- else {
- if (config.tcpPort > 0 && config.udpPort > 0) {
- logger.info("Connecting to TCP/UDP server [{}:{}]", hostName, config.tcpPort, config.udpPort);
- }
- else if (config.tcpPort > 0) {
- logger.info("Connecting to TCP server [{}:{}]", hostName, config.tcpPort);
- }
- else {
- logger.info("Connecting to UDP server [{}:{}]", hostName, config.udpPort);
- }
- }
-
- // have to start the registration process. This will wait until registration is complete and RMI methods are initialized
- // if this is called in the event dispatch thread for netty, it will deadlock!
- startRegistration();
-
-
- if (config.tcpPort == 0 && config.udpPort > 0) {
- // AFTER registration is complete, if we are UDP only -- setup a heartbeat (must be the larger of 2x the idle timeout OR 10 seconds)
- startUdpHeartbeat();
- }
- }
-
- @Override
- public
- boolean hasRemoteKeyChanged() {
- return connection.hasRemoteKeyChanged();
- }
-
- /**
- * @return the remote address, as a string.
- */
- @Override
- public
- String getRemoteHost() {
- return connection.getRemoteHost();
- }
-
- /**
- * @return true if this connection is established on the loopback interface
- */
- @Override
- public
- boolean isLoopback() {
- return connection.isLoopback();
- }
-
- /**
- * @return the connection (TCP or LOCAL) id of this connection.
- */
- @Override
- public
- int id() {
- return connection.id();
- }
-
- /**
- * @return the connection (TCP or LOCAL) id of this connection as a HEX string.
- */
- @Override
- public
- String idAsHex() {
- return connection.idAsHex();
- }
-
- @Override
- public
- boolean hasUDP() {
- return connection.hasUDP();
- }
-
- /**
- * Expose methods to send objects to a destination when the connection has become idle.
- */
- @Override
- public
- IdleBridge sendOnIdle(IdleSender, ?> sender) {
- return connection.sendOnIdle(sender);
- }
-
- /**
- * Expose methods to send objects to a destination when the connection has become idle.
- */
- @Override
- public
- IdleBridge sendOnIdle(Object message) {
- return connection.sendOnIdle(message);
- }
-
- /**
- * Marks the connection to be closed as soon as possible. This is evaluated when the current
- * thread execution returns to the network stack.
- */
- @Override
- public
- void closeAsap() {
- connection.closeAsap();
- }
-
- /**
- * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map"
- * to an object that is created remotely.
- *
- * The callback will be notified when the remote object has been created.
- *
- *
- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the
- * {@link RemoteObject#setResponseTimeout(int) response timeout}.
- *
- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must
- * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a
- * void return value can be called on the update thread.
- *
- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side
- * will have the proxy object replaced with the registered (non-proxy) object.
- *
- * If one wishes to change the default behavior, cast the object to access the different methods.
- * ie: `RemoteObject remoteObject = (RemoteObject) test;`
- *
- * @see RemoteObject
- */
- @Override
- public
- void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) {
- try {
- connection.createRemoteObject(interfaceClass, callback);
- } catch (NullPointerException e) {
- logger.error("Error creating remote object!", e);
- }
- }
-
- /**
- * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map"
- * to an object that is created remotely.
- *
- * The callback will be notified when the remote object has been created.
- *
- *
- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the
- * {@link RemoteObject#setResponseTimeout(int) response timeout}.
- *
- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must
- * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a
- * void return value can be called on the update thread.
- *
- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side
- * will have the proxy object replaced with the registered (non-proxy) object.
- *
- * If one wishes to change the default behavior, cast the object to access the different methods.
- * ie: `RemoteObject remoteObject = (RemoteObject) test;`
- *
- * @see RemoteObject
- */
- @Override
- public
- void getRemoteObject(final int objectId, final RemoteObjectCallback callback) {
- try {
- connection.getRemoteObject(objectId, callback);
- } catch (NullPointerException e) {
- logger.error("Error getting remote object!", e);
- }
- }
-
- /**
- * Fetches the connection used by the client.
- *
- * Make sure that you only call this after the client connects!
- *
- * This is preferred to {@link EndPoint#getConnections()}, as it properly does some error checking
- */
- @SuppressWarnings("unchecked")
- public
- C getConnection() {
- return (C) connection;
- }
-
- /**
- * Closes all connections ONLY (keeps the client running), does not remove any listeners. To STOP the client, use stop().
- *
- * This is used, for example, when reconnecting to a server.
- */
- @Override
- public
- void close() {
- closeConnection();
- }
-
- /**
- * Checks to see if this client has connected yet or not.
- *
- * @return true if we are connected, false otherwise.
- */
- public
- boolean isConnected() {
- return super.isConnected.get();
- }
-}
-
+/*
+ * Copyright 2010 dorkbox, llc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dorkbox.network;
+
+import static dorkbox.network.pipeline.ConnectionType.LOCAL;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import dorkbox.network.connection.BootstrapWrapper;
+import dorkbox.network.connection.Connection;
+import dorkbox.network.connection.EndPoint;
+import dorkbox.network.connection.EndPointClient;
+import dorkbox.network.connection.RegistrationWrapperClient;
+import dorkbox.network.connection.idle.IdleBridge;
+import dorkbox.network.connection.idle.IdleSender;
+import dorkbox.network.connection.registration.local.RegistrationLocalHandlerClient;
+import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientTCP;
+import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerClientUDP;
+import dorkbox.network.rmi.RemoteObject;
+import dorkbox.network.rmi.RemoteObjectCallback;
+import dorkbox.network.rmi.TimeoutException;
+import dorkbox.util.OS;
+import dorkbox.util.exceptions.SecurityException;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueueDatagramChannel;
+import io.netty.channel.kqueue.KQueueSocketChannel;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.socket.oio.OioDatagramChannel;
+import io.netty.channel.socket.oio.OioSocketChannel;
+
+/**
+ * The client is both SYNC and ASYNC. It starts off SYNC (blocks thread until it's done), then once it's connected to the server, it's
+ * ASYNC.
+ */
+@SuppressWarnings({"unused", "WeakerAccess"})
+public
+class Client extends EndPointClient implements Connection {
+ /**
+ * Gets the version number.
+ */
+ public static
+ String getVersion() {
+ return "4.1";
+ }
+
+ private final String localChannelName;
+ private final String hostName;
+ private Configuration config;
+
+ /**
+ * Starts a LOCAL only client, with the default local channel name and serialization scheme
+ */
+ public
+ Client() throws SecurityException {
+ this(Configuration.localOnly());
+ }
+
+ /**
+ * Starts a TCP & UDP client (or a LOCAL client), with the specified serialization scheme
+ */
+ public
+ Client(String host, int tcpPort, int udpPort, String localChannelName) throws SecurityException {
+ this(new Configuration(host, tcpPort, udpPort, localChannelName));
+ }
+
+ /**
+ * Starts a REMOTE only client, which will connect to the specified host using the specified Connections Options
+ */
+ @SuppressWarnings("AutoBoxing")
+ public
+ Client(final Configuration config) throws SecurityException {
+ super(config);
+
+ String threadName = Client.class.getSimpleName();
+
+ this.config = config;
+ boolean hostConfigured = (config.tcpPort > 0 || config.udpPort > 0) && config.host != null;
+ boolean isLocalChannel = config.localChannelName != null;
+
+ if (isLocalChannel && hostConfigured) {
+ String msg = threadName + " Local channel use and TCP/UDP use are MUTUALLY exclusive. Unable to determine what to do.";
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ localChannelName = config.localChannelName;
+ hostName = config.host;
+
+ Bootstrap localBootstrap = null;
+ Bootstrap tcpBootstrap = null;
+ Bootstrap udpBootstrap = null;
+
+
+ if (config.localChannelName != null) {
+ localBootstrap = new Bootstrap();
+ }
+
+ if (config.tcpPort > 0 || config.udpPort > 0) {
+ if (config.host == null) {
+ throw new IllegalArgumentException("You must define what host you want to connect to.");
+ }
+
+ if (config.host.equals("0.0.0.0")) {
+ throw new IllegalArgumentException("You cannot connect to 0.0.0.0, you must define what host you want to connect to.");
+ }
+ }
+
+ if (config.tcpPort > 0) {
+ tcpBootstrap = new Bootstrap();
+ }
+
+ if (config.udpPort > 0) {
+ udpBootstrap = new Bootstrap();
+ }
+
+ if (localBootstrap == null && tcpBootstrap == null && udpBootstrap == null) {
+ throw new IllegalArgumentException("You must define how you want to connect, either LOCAL channel name, TCP port, or UDP port");
+ }
+
+
+
+ if (config.localChannelName != null) {
+ // no networked bootstraps. LOCAL connection only
+
+ bootstraps.add(new BootstrapWrapper("LOCAL", config.localChannelName, -1, localBootstrap));
+
+ localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"))
+ .channel(LocalChannel.class)
+ .remoteAddress(new LocalAddress(config.localChannelName))
+ .handler(new RegistrationLocalHandlerClient(threadName, (RegistrationWrapperClient) registrationWrapper));
+ }
+
+
+ EventLoopGroup workerEventLoop = null;
+ if (tcpBootstrap != null || udpBootstrap != null) {
+ workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName);
+ }
+
+
+ if (tcpBootstrap != null) {
+ bootstraps.add(new BootstrapWrapper("TCP", config.host, config.tcpPort, tcpBootstrap));
+
+ if (OS.isAndroid()) {
+ // android ONLY supports OIO (not NIO)
+ tcpBootstrap.channel(OioSocketChannel.class);
+ }
+ else if (OS.isLinux() && NativeLibrary.isAvailable()) {
+ // epoll network stack is MUCH faster (but only on linux)
+ tcpBootstrap.channel(EpollSocketChannel.class);
+ }
+ else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
+ // KQueue network stack is MUCH faster (but only on macosx)
+ tcpBootstrap.channel(KQueueSocketChannel.class);
+ }
+ else {
+ tcpBootstrap.channel(NioSocketChannel.class);
+ }
+
+ tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
+ .remoteAddress(config.host, config.tcpPort)
+ .handler(new RegistrationRemoteHandlerClientTCP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop));
+
+ // android screws up on this!!
+ tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
+ .option(ChannelOption.SO_KEEPALIVE, true);
+ }
+
+
+ if (udpBootstrap != null) {
+ bootstraps.add(new BootstrapWrapper("UDP", config.host, config.udpPort, udpBootstrap));
+
+ if (OS.isAndroid()) {
+ // android ONLY supports OIO (not NIO)
+ udpBootstrap.channel(OioDatagramChannel.class);
+ }
+ else if (OS.isLinux() && NativeLibrary.isAvailable()) {
+ // epoll network stack is MUCH faster (but only on linux)
+ udpBootstrap.channel(EpollDatagramChannel.class);
+ }
+ else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
+ // KQueue network stack is MUCH faster (but only on macosx)
+ udpBootstrap.channel(KQueueDatagramChannel.class);
+ }
+ else {
+ udpBootstrap.channel(NioDatagramChannel.class);
+ }
+
+ udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ // Netty4 has a default of 2048 bytes as upper limit for datagram packets.
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(EndPoint.udpMaxSize))
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
+ .localAddress(new InetSocketAddress(0)) // bind to wildcard
+ .remoteAddress(new InetSocketAddress(config.host, config.udpPort))
+ .handler(new RegistrationRemoteHandlerClientUDP(threadName, (RegistrationWrapperClient) registrationWrapper, workerEventLoop));
+
+ // Enable to READ and WRITE MULTICAST data (ie, 192.168.1.0)
+ // in order to WRITE: write as normal, just make sure it ends in .255
+ // in order to LISTEN:
+ // InetAddress group = InetAddress.getByName("203.0.113.0");
+ // NioDatagramChannel.joinGroup(group);
+ // THEN once done
+ // NioDatagramChannel.leaveGroup(group), close the socket
+ udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
+ .option(ChannelOption.SO_SNDBUF, udpMaxSize);
+ }
+ }
+
+ /**
+ * Allows the client to reconnect to the last connected server
+ *
+ * @throws IOException
+ * if the client is unable to reconnect in the previously requested connection-timeout
+ */
+ public
+ void reconnect() throws IOException {
+ reconnect(connectionTimeout);
+ }
+
+ /**
+ * Allows the client to reconnect to the last connected server
+ *
+ * @throws IOException
+ * if the client is unable to reconnect in the requested time
+ */
+ public
+ void reconnect(final int connectionTimeout) throws IOException {
+ // make sure we are closed first
+ close();
+
+ connect(connectionTimeout);
+ }
+
+
+ /**
+ * will attempt to connect to the server, with a 30 second timeout.
+ *
+ * @throws IOException
+ * if the client is unable to connect in 30 seconds
+ */
+ public
+ void connect() throws IOException {
+ connect(30000);
+ }
+
+ /**
+ * will attempt to connect to the server, and will the specified timeout.
+ *
+ * will BLOCK until completed
+ *
+ * @param connectionTimeout
+ * wait for x milliseconds. 0 will wait indefinitely
+ *
+ * @throws IOException
+ * if the client is unable to connect in the requested time
+ */
+ public
+ void connect(final int connectionTimeout) throws IOException {
+ this.connectionTimeout = connectionTimeout;
+
+ // make sure we are not trying to connect during a close or stop event.
+ // This will wait until we have finished shutting down.
+ synchronized (shutdownInProgress) {
+ }
+
+ // if we are in the SAME thread as netty -- start in a new thread (otherwise we will deadlock)
+ if (isNettyThread()) {
+ runNewThread("Restart Thread", new Runnable(){
+ @Override
+ public
+ void run() {
+ try {
+ connect(connectionTimeout);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ return;
+ }
+
+
+
+ if (isShutdown()) {
+ throw new IOException("Unable to connect when shutdown...");
+ }
+
+ if (localChannelName != null) {
+ logger.info("Connecting to local server: {}", localChannelName);
+ }
+ else {
+ if (config.tcpPort > 0 && config.udpPort > 0) {
+ logger.info("Connecting to TCP/UDP server [{}:{}]", hostName, config.tcpPort, config.udpPort);
+ }
+ else if (config.tcpPort > 0) {
+ logger.info("Connecting to TCP server [{}:{}]", hostName, config.tcpPort);
+ }
+ else {
+ logger.info("Connecting to UDP server [{}:{}]", hostName, config.udpPort);
+ }
+ }
+
+ // have to start the registration process. This will wait until registration is complete and RMI methods are initialized
+ // if this is called in the event dispatch thread for netty, it will deadlock!
+ startRegistration();
+
+
+ if (config.tcpPort == 0 && config.udpPort > 0) {
+ // AFTER registration is complete, if we are UDP only -- setup a heartbeat (must be the larger of 2x the idle timeout OR 10 seconds)
+ startUdpHeartbeat();
+ }
+ }
+
+ @Override
+ public
+ boolean hasRemoteKeyChanged() {
+ return connection.hasRemoteKeyChanged();
+ }
+
+ /**
+ * @return the remote address, as a string.
+ */
+ @Override
+ public
+ String getRemoteHost() {
+ return connection.getRemoteHost();
+ }
+
+ /**
+ * @return true if this connection is established on the loopback interface
+ */
+ @Override
+ public
+ boolean isLoopback() {
+ return connection.isLoopback();
+ }
+
+ /**
+ * @return the connection (TCP or LOCAL) id of this connection.
+ */
+ @Override
+ public
+ int id() {
+ return connection.id();
+ }
+
+ /**
+ * @return the connection (TCP or LOCAL) id of this connection as a HEX string.
+ */
+ @Override
+ public
+ String idAsHex() {
+ return connection.idAsHex();
+ }
+
+ @Override
+ public
+ boolean hasUDP() {
+ return connection.hasUDP();
+ }
+
+ /**
+ * Expose methods to send objects to a destination when the connection has become idle.
+ */
+ @Override
+ public
+ IdleBridge sendOnIdle(IdleSender, ?> sender) {
+ return connection.sendOnIdle(sender);
+ }
+
+ /**
+ * Expose methods to send objects to a destination when the connection has become idle.
+ */
+ @Override
+ public
+ IdleBridge sendOnIdle(Object message) {
+ return connection.sendOnIdle(message);
+ }
+
+ /**
+ * Marks the connection to be closed as soon as possible. This is evaluated when the current
+ * thread execution returns to the network stack.
+ */
+ @Override
+ public
+ void closeAsap() {
+ connection.closeAsap();
+ }
+
+ /**
+ * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map"
+ * to an object that is created remotely.
+ *
+ * The callback will be notified when the remote object has been created.
+ *
+ *
+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the
+ * {@link RemoteObject#setResponseTimeout(int) response timeout}.
+ *
+ * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must
+ * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a
+ * void return value can be called on the update thread.
+ *
+ * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side
+ * will have the proxy object replaced with the registered (non-proxy) object.
+ *
+ * If one wishes to change the default behavior, cast the object to access the different methods.
+ * ie: `RemoteObject remoteObject = (RemoteObject) test;`
+ *
+ * @see RemoteObject
+ */
+ @Override
+ public
+ void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) {
+ try {
+ connection.createRemoteObject(interfaceClass, callback);
+ } catch (NullPointerException e) {
+ logger.error("Error creating remote object!", e);
+ }
+ }
+
+ /**
+ * Tells the remote connection to create a new proxy object that implements the specified interface. The methods on this object "map"
+ * to an object that is created remotely.
+ *
+ * The callback will be notified when the remote object has been created.
+ *
+ *
+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the
+ * {@link RemoteObject#setResponseTimeout(int) response timeout}.
+ *
+ * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must
+ * not be called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a
+ * void return value can be called on the update thread.
+ *
+ * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side
+ * will have the proxy object replaced with the registered (non-proxy) object.
+ *
+ * If one wishes to change the default behavior, cast the object to access the different methods.
+ * ie: `RemoteObject remoteObject = (RemoteObject) test;`
+ *
+ * @see RemoteObject
+ */
+ @Override
+ public
+ void getRemoteObject(final int objectId, final RemoteObjectCallback callback) {
+ try {
+ connection.getRemoteObject(objectId, callback);
+ } catch (NullPointerException e) {
+ logger.error("Error getting remote object!", e);
+ }
+ }
+
+ /**
+ * Fetches the connection used by the client.
+ *
+ * Make sure that you only call this after the client connects!
+ *
+ * This is preferred to {@link EndPoint#getConnections()}, as it properly does some error checking
+ */
+ @SuppressWarnings("unchecked")
+ public
+ C getConnection() {
+ return (C) connection;
+ }
+
+ /**
+ * Closes all connections ONLY (keeps the client running), does not remove any listeners. To STOP the client, use stop().
+ *
+ * This is used, for example, when reconnecting to a server.
+ */
+ @Override
+ public
+ void close() {
+ closeConnection();
+ }
+
+ /**
+ * Checks to see if this client has connected yet or not.
+ *
+ * @return true if we are connected, false otherwise.
+ */
+ public
+ boolean isConnected() {
+ return super.isConnected.get();
+ }
+}
+
diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java
index fe7360a4..39097ea8 100644
--- a/src/dorkbox/network/Server.java
+++ b/src/dorkbox/network/Server.java
@@ -1,485 +1,485 @@
-/*
- * Copyright 2010 dorkbox, llc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package dorkbox.network;
-
-import static dorkbox.network.pipeline.ConnectionType.LOCAL;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.util.Arrays;
-import java.util.List;
-
-import dorkbox.network.connection.Connection;
-import dorkbox.network.connection.EndPoint;
-import dorkbox.network.connection.EndPointServer;
-import dorkbox.network.connection.RegistrationWrapperServer;
-import dorkbox.network.connection.connectionType.ConnectionRule;
-import dorkbox.network.connection.registration.local.RegistrationLocalHandlerServer;
-import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerTCP;
-import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerUDP;
-import dorkbox.network.pipeline.discovery.BroadcastResponse;
-import dorkbox.util.OS;
-import dorkbox.util.Property;
-import dorkbox.util.exceptions.SecurityException;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.bootstrap.SessionBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollDatagramChannel;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.kqueue.KQueueDatagramChannel;
-import io.netty.channel.kqueue.KQueueServerSocketChannel;
-import io.netty.channel.local.LocalAddress;
-import io.netty.channel.local.LocalServerChannel;
-import io.netty.channel.socket.nio.NioDatagramChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.oio.OioDatagramChannel;
-import io.netty.channel.socket.oio.OioServerSocketChannel;
-import io.netty.handler.ipfilter.IpFilterRule;
-import io.netty.handler.ipfilter.IpFilterRuleType;
-import io.netty.handler.ipfilter.IpSubnetFilterRule;
-import io.netty.util.NetUtil;
-
-/**
- * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
- * server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections())
- *
- * To put it bluntly, ONLY have the server do work inside of a listener!
- */
-public
-class Server extends EndPointServer {
-
- /**
- * Rule that will always allow LOCALHOST to connect to the server. This is not added by default
- */
- public static final IpFilterRule permitLocalHostRule = new IpSubnetFilterRule(NetUtil.LOCALHOST, 32, IpFilterRuleType.ACCEPT);
-
- /**
- * Gets the version number.
- */
- public static
- String getVersion() {
- return "4.0";
- }
-
- /**
- * The maximum queue length for incoming connection indications (a request to connect). If a connection indication arrives when the
- * queue is full, the connection is refused.
- */
- @Property
- public static int backlogConnectionCount = 50;
-
- private final ServerBootstrap localBootstrap;
- private final ServerBootstrap tcpBootstrap;
- private final SessionBootstrap udpBootstrap;
-
- private final int tcpPort;
- private final int udpPort;
-
- private final String localChannelName;
- private final String hostName;
-
- private volatile boolean isRunning = false;
-
-
- /**
- * Starts a LOCAL only server, with the default serialization scheme.
- */
- public
- Server() throws SecurityException {
- this(Configuration.localOnly());
- }
-
- /**
- * Convenience method to starts a server with the specified Connection Options
- */
- @SuppressWarnings("AutoBoxing")
- public
- Server(Configuration config) throws SecurityException {
- // watch-out for serialization... it can be NULL incoming. The EndPoint (superclass) sets it, if null, so
- // you have to make sure to use this.serialization
- super(config);
-
- tcpPort = config.tcpPort;
- udpPort = config.udpPort;
-
- localChannelName = config.localChannelName;
-
- if (config.host == null) {
- // we set this to "0.0.0.0" so that it is clear that we are trying to bind to that address.
- hostName = "0.0.0.0";
-
- // make it clear that this is what we do (configuration wise) so that variable examination is consistent
- config.host = hostName;
- }
- else {
- hostName = config.host;
- }
-
-
- if (localChannelName != null) {
- localBootstrap = new ServerBootstrap();
- }
- else {
- localBootstrap = null;
- }
-
- if (tcpPort > 0) {
- tcpBootstrap = new ServerBootstrap();
- }
- else {
- tcpBootstrap = null;
- }
-
- if (udpPort > 0) {
- // This is what allows us to have UDP behave "similar" to TCP, in that a session is established based on the port/ip of the
- // remote connection. This allows us to reuse channels and have "state" for a UDP connection that normally wouldn't exist.
- // Additionally, this is what responds to discovery broadcast packets
- udpBootstrap = new SessionBootstrap(tcpPort, udpPort);
- }
- else {
- udpBootstrap = null;
- }
-
-
- String threadName = Server.class.getSimpleName();
-
- // always use local channels on the server.
- if (localBootstrap != null) {
- localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"),
- newEventLoop(LOCAL, 1, threadName ))
- .channel(LocalServerChannel.class)
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
- .localAddress(new LocalAddress(localChannelName))
- .childHandler(new RegistrationLocalHandlerServer(threadName, (RegistrationWrapperServer) registrationWrapper));
- }
-
-
- EventLoopGroup workerEventLoop = null;
- if (tcpBootstrap != null || udpBootstrap != null) {
- workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName);
- }
-
- if (tcpBootstrap != null) {
- if (OS.isAndroid()) {
- // android ONLY supports OIO (not NIO)
- tcpBootstrap.channel(OioServerSocketChannel.class);
- }
- else if (OS.isLinux() && NativeLibrary.isAvailable()) {
- // epoll network stack is MUCH faster (but only on linux)
- tcpBootstrap.channel(EpollServerSocketChannel.class);
- }
- else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
- // KQueue network stack is MUCH faster (but only on macosx)
- tcpBootstrap.channel(KQueueServerSocketChannel.class);
- }
- else {
- tcpBootstrap.channel(NioServerSocketChannel.class);
- }
-
- // TODO: If we use netty for an HTTP server,
- // Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server.
-
- tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"),
- newEventLoop(1, threadName + "-TCP-REGISTRATION"))
- .option(ChannelOption.SO_BACKLOG, backlogConnectionCount)
- .option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
-
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop));
-
- // have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address!
- if (hostName.equals("0.0.0.0")) {
- tcpBootstrap.localAddress(tcpPort);
- }
- else {
- tcpBootstrap.localAddress(hostName, tcpPort);
- }
-
-
- // android screws up on this!!
- tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
- .childOption(ChannelOption.TCP_NODELAY, !OS.isAndroid());
- }
-
-
- if (udpBootstrap != null) {
- if (OS.isAndroid()) {
- // android ONLY supports OIO (not NIO)
- udpBootstrap.channel(OioDatagramChannel.class);
- }
- else if (OS.isLinux() && NativeLibrary.isAvailable()) {
- // epoll network stack is MUCH faster (but only on linux)
- udpBootstrap.channel(EpollDatagramChannel.class);
- }
- else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
- // KQueue network stack is MUCH faster (but only on macosx)
- udpBootstrap.channel(KQueueDatagramChannel.class);
- }
- else {
- // windows and linux/mac that are incompatible with the native implementations
- udpBootstrap.channel(NioDatagramChannel.class);
- }
-
-
- // Netty4 has a default of 2048 bytes as upper limit for datagram packets, we want this to be whatever we specify
- FixedRecvByteBufAllocator recvByteBufAllocator = new FixedRecvByteBufAllocator(EndPoint.udpMaxSize);
-
- udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"),
- newEventLoop(1, threadName + "-UDP-REGISTRATION"))
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator)
- .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
-
- // a non-root user can't receive a broadcast packet on *nix if the socket is bound on non-wildcard address.
- // TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP
- // OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets)
- .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722
- .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop));
-
- // // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
- // if (hostName.equals("0.0.0.0")) {
- // udpBootstrap.localAddress(hostName, tcpPort);
- // }
- // else {
- // udpBootstrap.localAddress(udpPort);
- // }
-
- // Enable to READ from MULTICAST data (ie, 192.168.1.0)
- // in order to WRITE: write as normal, just make sure it ends in .255
- // in order to LISTEN:
- // InetAddress group = InetAddress.getByName("203.0.113.0");
- // socket.joinGroup(group);
- // THEN once done
- // socket.leaveGroup(group), close the socket
- // Enable to WRITE to MULTICAST data (ie, 192.168.1.0)
- udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
- .option(ChannelOption.SO_SNDBUF, udpMaxSize);
- }
- }
-
- /**
- * Binds the server to the configured, underlying protocols.
- *
- * This method will also BLOCK until the stop method is called, and if you want to continue running code after this method invocation,
- * bind should be called in a separate, non-daemon thread.
- */
- public
- void bind() {
- bind(true);
- }
-
- /**
- * Binds the server to the configured, underlying protocols.
- *
- * This is a more advanced method, and you should consider calling bind()
instead.
- *
- * @param blockUntilTerminate
- * will BLOCK until the server stop method is called, and if you want to continue running code after this method
- * invocation, bind should be called in a separate, non-daemon thread - or with false as the parameter.
- */
- @SuppressWarnings("AutoBoxing")
- public
- void bind(boolean blockUntilTerminate) {
- // make sure we are not trying to connect during a close or stop event.
- // This will wait until we have finished starting up/shutting down.
- synchronized (shutdownInProgress) {
- }
-
-
- // The bootstraps will be accessed ONE AT A TIME, in this order!
- ChannelFuture future;
-
- // LOCAL
- if (localBootstrap != null) {
- try {
- future = localBootstrap.bind();
- future.await();
- } catch (InterruptedException e) {
- throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", e);
- }
-
- if (!future.isSuccess()) {
- throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", future.cause());
- }
-
- logger.info("Listening on LOCAL address: [{}]", localChannelName);
- manageForShutdown(future);
- }
-
-
- // TCP
- if (tcpBootstrap != null) {
- // Wait until the connection attempt succeeds or fails.
- try {
- future = tcpBootstrap.bind();
- future.await();
- } catch (Exception e) {
- stop();
- throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", e);
- }
-
- if (!future.isSuccess()) {
- stop();
- throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", future.cause());
- }
-
- logger.info("TCP server listen address [{}:{}]", hostName, tcpPort);
- manageForShutdown(future);
- }
-
- // UDP
- if (udpBootstrap != null) {
- // Wait until the connection attempt succeeds or fails.
- try {
- future = udpBootstrap.bind();
- future.await();
- } catch (Exception e) {
- throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.", e);
- }
-
- if (!future.isSuccess()) {
- throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.",
- future.cause());
- }
-
- logger.info("UDP server listen address [{}:{}]", hostName, udpPort);
- manageForShutdown(future);
- }
-
- isRunning = true;
-
- // we now BLOCK until the stop method is called.
- // if we want to continue running code in the server, bind should be called in a separate, non-daemon thread.
- if (blockUntilTerminate) {
- waitForShutdown();
- }
- }
-
- /**
- * Adds an IP+subnet rule that defines if that IP+subnet is allowed/denied connectivity to this server.
- *
- * If there are any IP+subnet added to this list - then ONLY those are permitted (all else are denied)
- *
- * If there is nothing added to this list - then ALL are permitted
- */
- public
- void addIpFilter(IpFilterRule... rules) {
- ipFilterRules.addAll(Arrays.asList(rules));
- }
-
- /**
- * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have.
- * - NOTHING : Nothing happens to the in/out bytes
- * - COMPRESS: The in/out bytes are compressed with LZ4-fast
- * - COMPRESS_AND_ENCRYPT: The in/out bytes are compressed (LZ4-fast) THEN encrypted (AES-256-GCM)
- *
- * If no rules are defined, then for LOOPBACK, it will always be `COMPRESS` and for everything else it will always be `COMPRESS_AND_ENCRYPT`.
- *
- * If rules are defined, then everything by default is `COMPRESS_AND_ENCRYPT`.
- *
- * The compression algorithm is LZ4-fast, so there is a small performance impact for a very large gain
- * Compress : 6.210 micros/op; 629.0 MB/s (output: 55.4%)
- * Uncompress : 0.641 micros/op; 6097.9 MB/s
- */
- public
- void addConnectionTypeFilter(ConnectionRule... rules) {
- connectionRules.addAll(Arrays.asList(rules));
- }
-
- // called when we are stopped/shut down
- @Override
- protected
- void stopExtraActions() {
- isRunning = false;
-
- // now WAIT until bind has released the socket
- // wait a max of 10 tries
- int tries = 10;
- while (tries-- > 0 && isRunning(this.config)) {
- logger.warn("Server has requested shutdown, but the socket is still bound. Waiting {} more times", tries);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {
- }
- }
- }
-
- /**
- * @return true if this server has successfully bound to an IP address and is running
- */
- public
- boolean isRunning() {
- return isRunning;
- }
-
- /**
- * Checks to see if a server (using the specified configuration) is running. This will check across JVMs by checking the
- * network socket directly, and assumes that if the port is in use and answers, then the server is "running". This does not try to
- * authenticate or validate the connection.
- *
- * This does not check local-channels (which are intra-JVM only). Uses `Broadcast` to check for UDP servers
- *
- *
- * @return true if the configuration matches and can connect (but not verify) to the TCP control socket.
- */
- public static
- boolean isRunning(Configuration config) {
- String host = config.host;
-
- // for us, we want a "null" host to connect to the "any" interface.
- if (host == null) {
- host = "0.0.0.0";
- }
-
- if (config.tcpPort > 0) {
- Socket sock = null;
- // since we check the socket, if we cannot connect to a socket, then we're done.
- try {
- sock = new Socket(host, config.tcpPort);
- // if we can connect to the socket, it means that we are already running.
- return sock.isConnected();
- } catch (Exception ignored) {
- if (sock != null) {
- try {
- sock.close();
- } catch (IOException ignored2) {
- }
- }
- }
- }
-
- // use Broadcast to see if there is a UDP server connected
- if (config.udpPort > 0) {
- List broadcastResponses = null;
- try {
- broadcastResponses = Broadcast.discoverHosts0(null, config.udpPort, 500, true);
- return !broadcastResponses.isEmpty();
- } catch (IOException ignored) {
- }
- }
-
- return false;
- }
-}
-
+/*
+ * Copyright 2010 dorkbox, llc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dorkbox.network;
+
+import static dorkbox.network.pipeline.ConnectionType.LOCAL;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.List;
+
+import dorkbox.network.connection.Connection;
+import dorkbox.network.connection.EndPoint;
+import dorkbox.network.connection.EndPointServer;
+import dorkbox.network.connection.RegistrationWrapperServer;
+import dorkbox.network.connection.connectionType.ConnectionRule;
+import dorkbox.network.connection.registration.local.RegistrationLocalHandlerServer;
+import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerTCP;
+import dorkbox.network.connection.registration.remote.RegistrationRemoteHandlerServerUDP;
+import dorkbox.network.pipeline.discovery.BroadcastResponse;
+import dorkbox.util.OS;
+import dorkbox.util.Property;
+import dorkbox.util.exceptions.SecurityException;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.bootstrap.SessionBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.kqueue.KQueueDatagramChannel;
+import io.netty.channel.kqueue.KQueueServerSocketChannel;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalServerChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.oio.OioDatagramChannel;
+import io.netty.channel.socket.oio.OioServerSocketChannel;
+import io.netty.handler.ipfilter.IpFilterRule;
+import io.netty.handler.ipfilter.IpFilterRuleType;
+import io.netty.handler.ipfilter.IpSubnetFilterRule;
+import io.netty.util.NetUtil;
+
+/**
+ * The server can only be accessed in an ASYNC manner. This means that the server can only be used in RESPONSE to events. If you access the
+ * server OUTSIDE of events, you will get inaccurate information from the server (such as getConnections())
+ *
+ * To put it bluntly, ONLY have the server do work inside of a listener!
+ */
+public
+class Server extends EndPointServer {
+
+ /**
+ * Rule that will always allow LOCALHOST to connect to the server. This is not added by default
+ */
+ public static final IpFilterRule permitLocalHostRule = new IpSubnetFilterRule(NetUtil.LOCALHOST, 32, IpFilterRuleType.ACCEPT);
+
+ /**
+ * Gets the version number.
+ */
+ public static
+ String getVersion() {
+ return "4.1";
+ }
+
+ /**
+ * The maximum queue length for incoming connection indications (a request to connect). If a connection indication arrives when the
+ * queue is full, the connection is refused.
+ */
+ @Property
+ public static int backlogConnectionCount = 50;
+
+ private final ServerBootstrap localBootstrap;
+ private final ServerBootstrap tcpBootstrap;
+ private final SessionBootstrap udpBootstrap;
+
+ private final int tcpPort;
+ private final int udpPort;
+
+ private final String localChannelName;
+ private final String hostName;
+
+ private volatile boolean isRunning = false;
+
+
+ /**
+ * Starts a LOCAL only server, with the default serialization scheme.
+ */
+ public
+ Server() throws SecurityException {
+ this(Configuration.localOnly());
+ }
+
+ /**
+ * Convenience method to starts a server with the specified Connection Options
+ */
+ @SuppressWarnings("AutoBoxing")
+ public
+ Server(Configuration config) throws SecurityException {
+ // watch-out for serialization... it can be NULL incoming. The EndPoint (superclass) sets it, if null, so
+ // you have to make sure to use this.serialization
+ super(config);
+
+ tcpPort = config.tcpPort;
+ udpPort = config.udpPort;
+
+ localChannelName = config.localChannelName;
+
+ if (config.host == null) {
+ // we set this to "0.0.0.0" so that it is clear that we are trying to bind to that address.
+ hostName = "0.0.0.0";
+
+ // make it clear that this is what we do (configuration wise) so that variable examination is consistent
+ config.host = hostName;
+ }
+ else {
+ hostName = config.host;
+ }
+
+
+ if (localChannelName != null) {
+ localBootstrap = new ServerBootstrap();
+ }
+ else {
+ localBootstrap = null;
+ }
+
+ if (tcpPort > 0) {
+ tcpBootstrap = new ServerBootstrap();
+ }
+ else {
+ tcpBootstrap = null;
+ }
+
+ if (udpPort > 0) {
+ // This is what allows us to have UDP behave "similar" to TCP, in that a session is established based on the port/ip of the
+ // remote connection. This allows us to reuse channels and have "state" for a UDP connection that normally wouldn't exist.
+ // Additionally, this is what responds to discovery broadcast packets
+ udpBootstrap = new SessionBootstrap(tcpPort, udpPort);
+ }
+ else {
+ udpBootstrap = null;
+ }
+
+
+ String threadName = Server.class.getSimpleName();
+
+ // always use local channels on the server.
+ if (localBootstrap != null) {
+ localBootstrap.group(newEventLoop(LOCAL, 1, threadName + "-JVM-BOSS"),
+ newEventLoop(LOCAL, 1, threadName ))
+ .channel(LocalServerChannel.class)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
+ .localAddress(new LocalAddress(localChannelName))
+ .childHandler(new RegistrationLocalHandlerServer(threadName, (RegistrationWrapperServer) registrationWrapper));
+ }
+
+
+ EventLoopGroup workerEventLoop = null;
+ if (tcpBootstrap != null || udpBootstrap != null) {
+ workerEventLoop = newEventLoop(config.workerThreadPoolSize, threadName);
+ }
+
+ if (tcpBootstrap != null) {
+ if (OS.isAndroid()) {
+ // android ONLY supports OIO (not NIO)
+ tcpBootstrap.channel(OioServerSocketChannel.class);
+ }
+ else if (OS.isLinux() && NativeLibrary.isAvailable()) {
+ // epoll network stack is MUCH faster (but only on linux)
+ tcpBootstrap.channel(EpollServerSocketChannel.class);
+ }
+ else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
+ // KQueue network stack is MUCH faster (but only on macosx)
+ tcpBootstrap.channel(KQueueServerSocketChannel.class);
+ }
+ else {
+ tcpBootstrap.channel(NioServerSocketChannel.class);
+ }
+
+ // TODO: If we use netty for an HTTP server,
+ // Beside the usual ChannelOptions the Native Transport allows to enable TCP_CORK which may come in handy if you implement a HTTP Server.
+
+ tcpBootstrap.group(newEventLoop(1, threadName + "-TCP-BOSS"),
+ newEventLoop(1, threadName + "-TCP-REGISTRATION"))
+ .option(ChannelOption.SO_BACKLOG, backlogConnectionCount)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
+
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childHandler(new RegistrationRemoteHandlerServerTCP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop));
+
+ // have to check options.host for "0.0.0.0". we don't bind to "0.0.0.0", we bind to "null" to get the "any" address!
+ if (hostName.equals("0.0.0.0")) {
+ tcpBootstrap.localAddress(tcpPort);
+ }
+ else {
+ tcpBootstrap.localAddress(hostName, tcpPort);
+ }
+
+
+ // android screws up on this!!
+ tcpBootstrap.option(ChannelOption.TCP_NODELAY, !OS.isAndroid())
+ .childOption(ChannelOption.TCP_NODELAY, !OS.isAndroid());
+ }
+
+
+ if (udpBootstrap != null) {
+ if (OS.isAndroid()) {
+ // android ONLY supports OIO (not NIO)
+ udpBootstrap.channel(OioDatagramChannel.class);
+ }
+ else if (OS.isLinux() && NativeLibrary.isAvailable()) {
+ // epoll network stack is MUCH faster (but only on linux)
+ udpBootstrap.channel(EpollDatagramChannel.class);
+ }
+ else if (OS.isMacOsX() && NativeLibrary.isAvailable()) {
+ // KQueue network stack is MUCH faster (but only on macosx)
+ udpBootstrap.channel(KQueueDatagramChannel.class);
+ }
+ else {
+ // windows and linux/mac that are incompatible with the native implementations
+ udpBootstrap.channel(NioDatagramChannel.class);
+ }
+
+
+ // Netty4 has a default of 2048 bytes as upper limit for datagram packets, we want this to be whatever we specify
+ FixedRecvByteBufAllocator recvByteBufAllocator = new FixedRecvByteBufAllocator(EndPoint.udpMaxSize);
+
+ udpBootstrap.group(newEventLoop(1, threadName + "-UDP-BOSS"),
+ newEventLoop(1, threadName + "-UDP-REGISTRATION"))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WRITE_BUFF_LOW, WRITE_BUFF_HIGH))
+
+ // a non-root user can't receive a broadcast packet on *nix if the socket is bound on non-wildcard address.
+ // TODO: move broadcast to it's own handler, and have UDP server be able to be bound to a specific IP
+ // OF NOTE: At the end in my case I decided to bind to .255 broadcast address on Linux systems. (to receive broadcast packets)
+ .localAddress(udpPort) // if you bind to a specific interface, Linux will be unable to receive broadcast packets! see: http://developerweb.net/viewtopic.php?id=5722
+ .childHandler(new RegistrationRemoteHandlerServerUDP(threadName, (RegistrationWrapperServer) registrationWrapper, workerEventLoop));
+
+ // // have to check options.host for null. we don't bind to 0.0.0.0, we bind to "null" to get the "any" address!
+ // if (hostName.equals("0.0.0.0")) {
+ // udpBootstrap.localAddress(hostName, tcpPort);
+ // }
+ // else {
+ // udpBootstrap.localAddress(udpPort);
+ // }
+
+ // Enable to READ from MULTICAST data (ie, 192.168.1.0)
+ // in order to WRITE: write as normal, just make sure it ends in .255
+ // in order to LISTEN:
+ // InetAddress group = InetAddress.getByName("203.0.113.0");
+ // socket.joinGroup(group);
+ // THEN once done
+ // socket.leaveGroup(group), close the socket
+ // Enable to WRITE to MULTICAST data (ie, 192.168.1.0)
+ udpBootstrap.option(ChannelOption.SO_BROADCAST, false)
+ .option(ChannelOption.SO_SNDBUF, udpMaxSize);
+ }
+ }
+
+ /**
+ * Binds the server to the configured, underlying protocols.
+ *
+ * This method will also BLOCK until the stop method is called, and if you want to continue running code after this method invocation,
+ * bind should be called in a separate, non-daemon thread.
+ */
+ public
+ void bind() {
+ bind(true);
+ }
+
+ /**
+ * Binds the server to the configured, underlying protocols.
+ *
+ * This is a more advanced method, and you should consider calling bind()
instead.
+ *
+ * @param blockUntilTerminate
+ * will BLOCK until the server stop method is called, and if you want to continue running code after this method
+ * invocation, bind should be called in a separate, non-daemon thread - or with false as the parameter.
+ */
+ @SuppressWarnings("AutoBoxing")
+ public
+ void bind(boolean blockUntilTerminate) {
+ // make sure we are not trying to connect during a close or stop event.
+ // This will wait until we have finished starting up/shutting down.
+ synchronized (shutdownInProgress) {
+ }
+
+
+ // The bootstraps will be accessed ONE AT A TIME, in this order!
+ ChannelFuture future;
+
+ // LOCAL
+ if (localBootstrap != null) {
+ try {
+ future = localBootstrap.bind();
+ future.await();
+ } catch (InterruptedException e) {
+ throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", e);
+ }
+
+ if (!future.isSuccess()) {
+ throw new IllegalArgumentException("Could not bind to LOCAL address '" + localChannelName + "' on the server.", future.cause());
+ }
+
+ logger.info("Listening on LOCAL address: [{}]", localChannelName);
+ manageForShutdown(future);
+ }
+
+
+ // TCP
+ if (tcpBootstrap != null) {
+ // Wait until the connection attempt succeeds or fails.
+ try {
+ future = tcpBootstrap.bind();
+ future.await();
+ } catch (Exception e) {
+ stop();
+ throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", e);
+ }
+
+ if (!future.isSuccess()) {
+ stop();
+ throw new IllegalArgumentException("Could not bind to address " + hostName + " TCP port " + tcpPort + " on the server.", future.cause());
+ }
+
+ logger.info("TCP server listen address [{}:{}]", hostName, tcpPort);
+ manageForShutdown(future);
+ }
+
+ // UDP
+ if (udpBootstrap != null) {
+ // Wait until the connection attempt succeeds or fails.
+ try {
+ future = udpBootstrap.bind();
+ future.await();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.", e);
+ }
+
+ if (!future.isSuccess()) {
+ throw new IllegalArgumentException("Could not bind to address " + hostName + " UDP port " + udpPort + " on the server.",
+ future.cause());
+ }
+
+ logger.info("UDP server listen address [{}:{}]", hostName, udpPort);
+ manageForShutdown(future);
+ }
+
+ isRunning = true;
+
+ // we now BLOCK until the stop method is called.
+ // if we want to continue running code in the server, bind should be called in a separate, non-daemon thread.
+ if (blockUntilTerminate) {
+ waitForShutdown();
+ }
+ }
+
+ /**
+ * Adds an IP+subnet rule that defines if that IP+subnet is allowed/denied connectivity to this server.
+ *
+ * If there are any IP+subnet added to this list - then ONLY those are permitted (all else are denied)
+ *
+ * If there is nothing added to this list - then ALL are permitted
+ */
+ public
+ void addIpFilter(IpFilterRule... rules) {
+ ipFilterRules.addAll(Arrays.asList(rules));
+ }
+
+ /**
+ * Adds an IP+subnet rule that defines what type of connection this IP+subnet should have.
+ * - NOTHING : Nothing happens to the in/out bytes
+ * - COMPRESS: The in/out bytes are compressed with LZ4-fast
+ * - COMPRESS_AND_ENCRYPT: The in/out bytes are compressed (LZ4-fast) THEN encrypted (AES-256-GCM)
+ *
+ * If no rules are defined, then for LOOPBACK, it will always be `COMPRESS` and for everything else it will always be `COMPRESS_AND_ENCRYPT`.
+ *
+ * If rules are defined, then everything by default is `COMPRESS_AND_ENCRYPT`.
+ *
+ * The compression algorithm is LZ4-fast, so there is a small performance impact for a very large gain
+ * Compress : 6.210 micros/op; 629.0 MB/s (output: 55.4%)
+ * Uncompress : 0.641 micros/op; 6097.9 MB/s
+ */
+ public
+ void addConnectionTypeFilter(ConnectionRule... rules) {
+ connectionRules.addAll(Arrays.asList(rules));
+ }
+
+ // called when we are stopped/shut down
+ @Override
+ protected
+ void stopExtraActions() {
+ isRunning = false;
+
+ // now WAIT until bind has released the socket
+ // wait a max of 10 tries
+ int tries = 10;
+ while (tries-- > 0 && isRunning(this.config)) {
+ logger.warn("Server has requested shutdown, but the socket is still bound. Waiting {} more times", tries);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ /**
+ * @return true if this server has successfully bound to an IP address and is running
+ */
+ public
+ boolean isRunning() {
+ return isRunning;
+ }
+
+ /**
+ * Checks to see if a server (using the specified configuration) is running. This will check across JVMs by checking the
+ * network socket directly, and assumes that if the port is in use and answers, then the server is "running". This does not try to
+ * authenticate or validate the connection.
+ *
+ * This does not check local-channels (which are intra-JVM only). Uses `Broadcast` to check for UDP servers
+ *
+ *
+ * @return true if the configuration matches and can connect (but not verify) to the TCP control socket.
+ */
+ public static
+ boolean isRunning(Configuration config) {
+ String host = config.host;
+
+ // for us, we want a "null" host to connect to the "any" interface.
+ if (host == null) {
+ host = "0.0.0.0";
+ }
+
+ if (config.tcpPort > 0) {
+ Socket sock = null;
+ // since we check the socket, if we cannot connect to a socket, then we're done.
+ try {
+ sock = new Socket(host, config.tcpPort);
+ // if we can connect to the socket, it means that we are already running.
+ return sock.isConnected();
+ } catch (Exception ignored) {
+ if (sock != null) {
+ try {
+ sock.close();
+ } catch (IOException ignored2) {
+ }
+ }
+ }
+ }
+
+ // use Broadcast to see if there is a UDP server connected
+ if (config.udpPort > 0) {
+ List broadcastResponses = null;
+ try {
+ broadcastResponses = Broadcast.discoverHosts0(null, config.udpPort, 500, true);
+ return !broadcastResponses.isEmpty();
+ } catch (IOException ignored) {
+ }
+ }
+
+ return false;
+ }
+}
+