diff --git a/src/dorkbox/network/Client.java b/src/dorkbox/network/Client.java index c53afdfd..2eed95d1 100644 --- a/src/dorkbox/network/Client.java +++ b/src/dorkbox/network/Client.java @@ -22,7 +22,7 @@ import org.slf4j.Logger; import dorkbox.network.connection.BootstrapWrapper; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.EndPointClient; import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleSender; @@ -333,7 +333,7 @@ class Client extends EndPointClient implements Connecti @SuppressWarnings("rawtypes") @Override public - EndPoint getEndPoint() { + EndPointBase getEndPoint() { return this; } @@ -450,7 +450,7 @@ class Client extends EndPointClient implements Connecti *

* Make sure that you only call this after the client connects! *

- * This is preferred to {@link EndPoint#getConnections()} getConnections()}, as it properly does some error checking + * This is preferred to {@link EndPointBase#getConnections()} getConnections()}, as it properly does some error checking */ public C getConnection() { diff --git a/src/dorkbox/network/Configuration.java b/src/dorkbox/network/Configuration.java index 591fe70a..50512d89 100644 --- a/src/dorkbox/network/Configuration.java +++ b/src/dorkbox/network/Configuration.java @@ -17,7 +17,7 @@ package dorkbox.network; import java.util.concurrent.Executor; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.store.SettingsStore; import dorkbox.network.util.CryptoSerializationManager; @@ -76,7 +76,7 @@ class Configuration { public static Configuration localOnly() { Configuration configuration = new Configuration(); - configuration.localChannelName = EndPoint.LOCAL_CHANNEL; + configuration.localChannelName = EndPointBase.LOCAL_CHANNEL; return configuration; } diff --git a/src/dorkbox/network/DnsClient.java b/src/dorkbox/network/DnsClient.java index 714f035f..077806c5 100644 --- a/src/dorkbox/network/DnsClient.java +++ b/src/dorkbox/network/DnsClient.java @@ -22,16 +22,13 @@ import static io.netty.util.internal.ObjectUtil.intValue; import java.net.IDN; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.security.AccessControlException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -88,30 +85,7 @@ import io.netty.util.internal.PlatformDependent; */ @SuppressWarnings({"unused", "WeakerAccess"}) public -class DnsClient { - - // duplicated in EndPoint - static { - //noinspection Duplicates - try { - if (PlatformDependent.isAndroid()) { - // doesn't work when running from inside eclipse. - // Needed for NIO selectors on Android 2.2, and to force IPv4. - System.setProperty("java.net.preferIPv4Stack", Boolean.TRUE.toString()); - System.setProperty("java.net.preferIPv6Addresses", Boolean.FALSE.toString()); - } - - // java6 has stack overflow problems when loading certain classes in it's classloader. The result is a StackOverflow when - // loading them normally. This calls AND FIXES this issue. - if (OS.javaVersion == 6) { - if (PlatformDependent.hasUnsafe()) { - //noinspection ResultOfMethodCallIgnored - PlatformDependent.newFixedMpscQueue(8); - } - } - } catch (AccessControlException ignored) { - } - } +class DnsClient extends EndPoint { /** * This is a list of all of the public DNS servers to query, when submitting DNS queries @@ -241,13 +215,7 @@ class DnsClient { */ public DnsClient(Collection nameServerAddresses) { - // setup the thread group to easily ID what the following threads belong to (and their spawned threads...) - SecurityManager s = System.getSecurityManager(); - threadGroup = new ThreadGroup(s != null - ? s.getThreadGroup() - : Thread.currentThread() - .getThreadGroup(), THREAD_NAME); - threadGroup.setDaemon(true); + super(DnsClient.class); if (PlatformDependent.isAndroid()) { // android ONLY supports OIO (not NIO) @@ -264,6 +232,8 @@ class DnsClient { channelType = NioDatagramChannel.class; } + manageForShutdown(eventLoopGroup); + // NOTE: A/AAAA use the built-in decoder customDecoders.put(DnsRecordType.MX, new MailExchangerDecoder()); @@ -601,6 +571,35 @@ class DnsClient { return this; } + /** + * Clears the DNS resolver cache + */ + public + void reset() { + if (resolver == null) { + start(); + } + + clearResolver(); + } + + private + void clearResolver() { + resolver.resolveCache() + .clear(); + } + + @Override + protected + void stopExtraActions() { + clearResolver(); + + if (resolver != null) { + resolver.close(); // also closes the UDP channel that DNS client uses + } + } + + /** * Resolves a specific hostname A record * @@ -720,78 +719,6 @@ class DnsClient { return null; } - - /** - * Clears the DNS resolver cache - */ - public - void reset() { - if (resolver == null) { - start(); - } - - clearResolver(); - } - - private - void clearResolver() { - resolver.resolveCache() - .clear(); - } - - - /** - * Safely closes all associated resources/threads/connections - */ - public - void stop() { - // we also want to stop the thread group (but NOT in our current thread!) - if (Thread.currentThread() - .getThreadGroup() - .getName() - .equals(THREAD_NAME)) { - - Thread thread = new Thread(new Runnable() { - @Override - public - void run() { - DnsClient.this.stopInThread(); - } - }); - thread.setDaemon(false); - thread.setName("DnsClient Shutdown"); - thread.start(); - } - else { - stopInThread(); - } - } - - - private - void stopInThread() { - clearResolver(); - - if (resolver != null) { - resolver.close(); // also closes the UDP channel that DNS client uses - } - - // Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed. - long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; - - - - // we want to WAIT until after the event executors have completed shutting down. - List> shutdownThreadList = new LinkedList>(); - - // now wait it them to finish! - // It can take a few seconds to shut down the executor. This will affect unit testing, where connections are quickly created/stopped - eventLoopGroup.shutdownGracefully(maxShutdownWaitTimeInMilliSeconds, maxShutdownWaitTimeInMilliSeconds * 4, TimeUnit.MILLISECONDS) - .syncUninterruptibly(); - - threadGroup.interrupt(); - } - public ServiceRecord resolveSRV(final String hostname) { return resolve(hostname, DnsRecordType.SRV); diff --git a/src/dorkbox/network/Server.java b/src/dorkbox/network/Server.java index bdac5970..76e46907 100644 --- a/src/dorkbox/network/Server.java +++ b/src/dorkbox/network/Server.java @@ -16,6 +16,7 @@ package dorkbox.network; import java.io.IOException; +import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; @@ -83,6 +84,8 @@ class Server extends EndPointServer { private final String localChannelName; private final String hostName; + private final CountDownLatch blockUntilDone = new CountDownLatch(1); + /** * Starts a LOCAL only server, with the default serialization scheme. */ diff --git a/src/dorkbox/network/connection/Connection.java b/src/dorkbox/network/connection/Connection.java index fb9679db..e866d6bd 100644 --- a/src/dorkbox/network/connection/Connection.java +++ b/src/dorkbox/network/connection/Connection.java @@ -47,7 +47,7 @@ interface Connection { * @return the endpoint associated with this connection */ @SuppressWarnings("rawtypes") - EndPoint getEndPoint(); + EndPointBase getEndPoint(); /** diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index 4a896676..694b4292 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -93,7 +93,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn // while on the CLIENT, if the SERVER's ecc key has changed, the client will abort and show an error. private boolean remoteKeyChanged; - private final EndPoint endPoint; + private final EndPointBase endPointBaseConnection; // when true, the connection will be closed (either as RMI or as 'normal' listener execution) when the thread execution returns control // back to the network stack @@ -117,9 +117,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn */ @SuppressWarnings({"rawtypes", "unchecked"}) public - ConnectionImpl(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { + ConnectionImpl(final Logger logger, final EndPointBase endPointBaseConnection, final RmiBridge rmiBridge) { this.logger = logger; - this.endPoint = endPoint; + this.endPointBaseConnection = endPointBaseConnection; this.rmiBridge = rmiBridge; } @@ -214,8 +214,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn */ @Override public - EndPoint getEndPoint() { - return this.endPoint; + EndPointBase getEndPoint() { + return this.endPointBaseConnection; } /** @@ -614,7 +614,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn void close() { // only close if we aren't already in the middle of closing. if (this.closeInProgress.compareAndSet(false, true)) { - int idleTimeoutMs = this.endPoint.getIdleTimeout(); + int idleTimeoutMs = this.endPointBaseConnection.getIdleTimeout(); if (idleTimeoutMs == 0) { // default is 2 second timeout, in milliseconds. idleTimeoutMs = 2000; @@ -714,7 +714,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn @Override public final Listeners add(Listener listener) { - if (this.endPoint instanceof EndPointServer) { + if (this.endPointBaseConnection instanceof EndPointServer) { // when we are a server, NORMALLY listeners are added at the GLOBAL level // meaning -- // I add one listener, and ALL connections are notified of that listener. @@ -726,15 +726,15 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn // is empty, we can remove it from this connection. synchronized (this) { if (this.localListenerManager == null) { - this.localListenerManager = ((EndPointServer) this.endPoint).addListenerManager(this); + this.localListenerManager = ((EndPointServer) this.endPointBaseConnection).addListenerManager(this); } this.localListenerManager.add(listener); } } else { - this.endPoint.listeners() - .add(listener); + this.endPointBaseConnection.listeners() + .add(listener); } return this; @@ -756,7 +756,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn @Override public final Listeners remove(Listener listener) { - if (this.endPoint instanceof EndPointServer) { + if (this.endPointBaseConnection instanceof EndPointServer) { // when we are a server, NORMALLY listeners are added at the GLOBAL level // meaning -- // I add one listener, and ALL connections are notified of that listener. @@ -771,14 +771,14 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn this.localListenerManager.remove(listener); if (!this.localListenerManager.hasListeners()) { - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPointBaseConnection).removeListenerManager(this); } } } } else { - this.endPoint.listeners() - .remove(listener); + this.endPointBaseConnection.listeners() + .remove(listener); } return this; @@ -791,7 +791,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn @Override public final Listeners removeAll() { - if (this.endPoint instanceof EndPointServer) { + if (this.endPointBaseConnection instanceof EndPointServer) { // when we are a server, NORMALLY listeners are added at the GLOBAL level // meaning -- // I add one listener, and ALL connections are notified of that listener. @@ -806,13 +806,13 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn this.localListenerManager.removeAll(); this.localListenerManager = null; - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPointBaseConnection).removeListenerManager(this); } } } else { - this.endPoint.listeners() - .removeAll(); + this.endPointBaseConnection.listeners() + .removeAll(); } return this; @@ -826,7 +826,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn @Override public final Listeners removeAll(Class classType) { - if (this.endPoint instanceof EndPointServer) { + if (this.endPointBaseConnection instanceof EndPointServer) { // when we are a server, NORMALLY listeners are added at the GLOBAL level // meaning -- // I add one listener, and ALL connections are notified of that listener. @@ -842,14 +842,14 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn if (!this.localListenerManager.hasListeners()) { this.localListenerManager = null; - ((EndPointServer) this.endPoint).removeListenerManager(this); + ((EndPointServer) this.endPointBaseConnection).removeListenerManager(this); } } } } else { - this.endPoint.listeners() - .removeAll(classType); + this.endPointBaseConnection.listeners() + .removeAll(classType); } return this; @@ -1111,7 +1111,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn public int getRegisteredId(final T object) { // always check local before checking global, because less contention on the synchronization - RmiBridge globalRmiBridge = endPoint.globalRmiBridge; + RmiBridge globalRmiBridge = endPointBaseConnection.globalRmiBridge; if (globalRmiBridge == null) { throw new NullPointerException("Unable to call 'getRegisteredId' when the globalRmiBridge is null!"); @@ -1155,7 +1155,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn public Object getImplementationObject(final int objectID) { if (RmiBridge.isGlobal(objectID)) { - RmiBridge globalRmiBridge = endPoint.globalRmiBridge; + RmiBridge globalRmiBridge = endPointBaseConnection.globalRmiBridge; if (globalRmiBridge == null) { throw new NullPointerException("Unable to call 'getRegisteredId' when the gloablRmiBridge is null!"); diff --git a/src/dorkbox/network/connection/ConnectionManager.java b/src/dorkbox/network/connection/ConnectionManager.java index bce1d50a..e5cb639a 100644 --- a/src/dorkbox/network/connection/ConnectionManager.java +++ b/src/dorkbox/network/connection/ConnectionManager.java @@ -294,7 +294,7 @@ class ConnectionManager implements Listeners, ISessionMana * Invoked when a message object was received from a remote peer. *

* If data is sent in response to this event, the connection data is automatically flushed to the wire. If the data is sent in a separate thread, - * {@link EndPoint#send().flush()} must be called manually. + * {@link EndPointBase#send().flush()} must be called manually. *

* {@link ISessionManager} */ diff --git a/src/dorkbox/network/connection/EndPoint.java b/src/dorkbox/network/connection/EndPoint.java index 04348cbb..510210e1 100644 --- a/src/dorkbox/network/connection/EndPoint.java +++ b/src/dorkbox/network/connection/EndPoint.java @@ -1,132 +1,30 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package dorkbox.network.connection; -import java.io.IOException; import java.security.AccessControlException; -import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.bouncycastle.crypto.AsymmetricCipherKeyPair; -import org.bouncycastle.crypto.params.ECPrivateKeyParameters; -import org.bouncycastle.crypto.params.ECPublicKeyParameters; import org.slf4j.Logger; -import dorkbox.network.Configuration; -import dorkbox.network.connection.bridge.ConnectionBridgeBase; -import dorkbox.network.connection.ping.PingSystemListener; -import dorkbox.network.connection.registration.MetaChannel; -import dorkbox.network.connection.wrapper.ChannelLocalWrapper; -import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; -import dorkbox.network.connection.wrapper.ChannelWrapper; -import dorkbox.network.pipeline.KryoEncoder; -import dorkbox.network.pipeline.KryoEncoderCrypto; -import dorkbox.network.rmi.RmiBridge; -import dorkbox.network.store.NullSettingsStore; -import dorkbox.network.store.SettingsStore; import dorkbox.util.OS; import dorkbox.util.Property; -import dorkbox.util.crypto.CryptoECC; -import dorkbox.util.entropy.Entropy; -import dorkbox.util.exceptions.InitializationException; -import dorkbox.util.exceptions.SecurityException; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; -import io.netty.util.NetUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.internal.PlatformDependent; /** - * represents the base of a client/server end point + * This is the highest level endpoint, for lifecycle support/management. */ -public abstract -class EndPoint { - // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! - // it results in severe UDP packet loss and contention. - // - // http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM - // also, a google search on just "INET97/proceedings/F3/F3_1.HTM" turns up interesting problems. - // Usually it's with ISPs. - - // TODO: will also want an UDP keepalive? (TCP is already there b/c of socket options, but might need a heartbeat to detect dead connections?) - // routers sometimes need a heartbeat to keep the connection - // TODO: maybe some sort of STUN-like connection keep-alive?? - - - public static final String LOCAL_CHANNEL = "local_channel"; - protected static final String shutdownHookName = "::SHUTDOWN_HOOK::"; - protected static final String stopTreadName = "::STOP_THREAD::"; - - /** - * The HIGH and LOW watermark points for connections - */ - @Property - public static final int WRITE_BUFF_HIGH = 32 * 1024; - @Property - public static final int WRITE_BUFF_LOW = 8 * 1024; - - public static final String THREADGROUP_NAME = "(Netty)"; - - /** - * this can be changed to a more specialized value, if necessary - */ - @Property - public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime() - .availableProcessors() * 2; - /** - * The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully. - */ - @Property - public static long maxShutdownWaitTimeInMilliSeconds = 2000L; // in milliseconds - - /** - * The default size for UDP packets is 768 bytes. - *

- * You could increase or decrease this value to avoid truncated packets - * or to improve memory footprint respectively. - *

- * Please also note that a large UDP packet might be truncated or - * dropped by your router no matter how you configured this option. - * In UDP, a packet is truncated or dropped if it is larger than a - * certain size, depending on router configuration. IPv4 routers - * truncate and IPv6 routers drop a large packet. That's why it is - * safe to send small packets in UDP. - *

- * To fit into that magic 576-byte MTU and avoid fragmentation, your - * UDP payload should be restricted by 576-60-8=508 bytes. - * - * This can be set higher on an internal lan! - * - * DON'T go higher that 1400 over the internet, but 9k is possible - * with jumbo frames on a local network (if it's supported) - */ - @Property - public static int udpMaxSize = 508; - - +public +class EndPoint { // duplicated in DnsClient static { //noinspection Duplicates @@ -137,9 +35,10 @@ class EndPoint { System.setProperty("java.net.preferIPv6Addresses", Boolean.FALSE.toString()); // java6 has stack overflow problems when loading certain classes in it's classloader. The result is a StackOverflow when - // loading them normally + // loading them normally. This calls AND FIXES this issue. if (OS.javaVersion == 6) { if (PlatformDependent.hasUnsafe()) { + //noinspection ResultOfMethodCallIgnored PlatformDependent.newFixedMpscQueue(8); } } @@ -148,26 +47,40 @@ class EndPoint { } + protected static final String shutdownHookName = "::SHUTDOWN_HOOK::"; + protected static final String stopTreadName = "::STOP_THREAD::"; + + public static final String THREADGROUP_NAME = "(Netty)"; + + /** + * The HIGH and LOW watermark points for connections + */ + @Property + public static final int WRITE_BUFF_HIGH = 32 * 1024; + @Property + public static final int WRITE_BUFF_LOW = 8 * 1024; + + /** + * this can be changed to a more specialized value, if necessary + */ + @Property + public static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime() + .availableProcessors() * 2; + + /** + * The amount of time in milli-seconds to wait for this endpoint to close all {@link Channel}s and shutdown gracefully. + */ + @Property + public static long maxShutdownWaitTimeInMilliSeconds = 2000L; // in milliseconds + + protected final org.slf4j.Logger logger; protected final ThreadGroup threadGroup; - protected final Class> type; - protected final ConnectionManager connectionManager; - protected final dorkbox.network.util.CryptoSerializationManager serializationManager; - protected final RegistrationWrapper registrationWrapper; + protected final Class type; protected final Object shutdownInProgress = new Object(); - final ECPrivateKeyParameters privateKey; - final ECPublicKeyParameters publicKey; - - final SecureRandom secureRandom; - final RmiBridge globalRmiBridge; - - private final CountDownLatch blockUntilDone = new CountDownLatch(1); - - private final Executor rmiExecutor; - private final boolean rmiEnabled; // the eventLoop groups are used to track and manage the event loops for startup/shutdown private final List eventLoopGroups = new ArrayList(8); @@ -176,128 +89,30 @@ class EndPoint { // make sure that the endpoint is closed on JVM shutdown (if it's still open at that point in time) private Thread shutdownHook; + private final CountDownLatch blockUntilDone = new CountDownLatch(1); + private AtomicBoolean stopCalled = new AtomicBoolean(false); - private AtomicBoolean isConnected = new AtomicBoolean(false); - SettingsStore propertyStore; - boolean disableRemoteKeyValidation; - - /** - * in milliseconds. default is disabled! - */ - private volatile int idleTimeoutMs = 0; - - - /** - * @param type this is either "Client" or "Server", depending on who is creating this endpoint. - * @param options these are the specific connection options - * @throws InitializationException - * @throws SecurityException - */ - @SuppressWarnings({"unchecked", "rawtypes"}) public - EndPoint(Class type, final Configuration options) throws InitializationException, SecurityException, IOException { - this.type = (Class>) type; + EndPoint(final Class type) { + this.type = type; // setup the thread group to easily ID what the following threads belong to (and their spawned threads...) SecurityManager s = System.getSecurityManager(); threadGroup = new ThreadGroup(s != null - ? s.getThreadGroup() - : Thread.currentThread() - .getThreadGroup(), type.getSimpleName() + " " + THREADGROUP_NAME); + ? s.getThreadGroup() + : Thread.currentThread() + .getThreadGroup(), type.getSimpleName() + " " + THREADGROUP_NAME); threadGroup.setDaemon(true); this.logger = org.slf4j.LoggerFactory.getLogger(type.getSimpleName()); - // make sure that 'localhost' is ALWAYS our specific loopback IP address - if (options.host != null && (options.host.equals("localhost") || options.host.startsWith("127."))) { - // localhost IP might not always be 127.0.0.1 - options.host = NetUtil.LOCALHOST.getHostAddress(); - } - - // serialization stuff - if (options.serialization != null) { - this.serializationManager = options.serialization; - } else { - this.serializationManager = CryptoSerializationManager.DEFAULT(); - } - - // setup our RMI serialization managers. Can only be called once - rmiEnabled = serializationManager.initRmiSerialization(); - rmiExecutor = options.rmiExecutor; - - - // The registration wrapper permits the registration process to access protected/package fields/methods, that we don't want - // to expose to external code. "this" escaping can be ignored, because it is benign. - //noinspection ThisEscapedInObjectConstruction - this.registrationWrapper = new RegistrationWrapper(this, - this.logger, - new KryoEncoder(this.serializationManager), - new KryoEncoderCrypto(this.serializationManager)); - - - // we have to be able to specify WHAT property store we want to use, since it can change! - if (options.settingsStore == null) { - this.propertyStore = new PropertyStore(); - } - else { - this.propertyStore = options.settingsStore; - } - - this.propertyStore.init(this.serializationManager, null); - - // null it out, since it is sensitive! - options.settingsStore = null; - - - if (!(this.propertyStore instanceof NullSettingsStore)) { - // initialize the private/public keys used for negotiating ECC handshakes - // these are ONLY used for IP connections. LOCAL connections do not need a handshake! - ECPrivateKeyParameters privateKey = this.propertyStore.getPrivateKey(); - ECPublicKeyParameters publicKey = this.propertyStore.getPublicKey(); - - if (privateKey == null || publicKey == null) { - try { - // seed our RNG based off of this and create our ECC keys - byte[] seedBytes = Entropy.get("There are no ECC keys for the " + type.getSimpleName() + " yet"); - SecureRandom secureRandom = new SecureRandom(seedBytes); - secureRandom.nextBytes(seedBytes); - - this.logger.debug("Now generating ECC (" + CryptoECC.curve25519 + ") keys. Please wait!"); - AsymmetricCipherKeyPair generateKeyPair = CryptoECC.generateKeyPair(CryptoECC.curve25519, secureRandom); - - privateKey = (ECPrivateKeyParameters) generateKeyPair.getPrivate(); - publicKey = (ECPublicKeyParameters) generateKeyPair.getPublic(); - - // save to properties file - this.propertyStore.savePrivateKey(privateKey); - this.propertyStore.savePublicKey(publicKey); - - this.logger.debug("Done with ECC keys!"); - } catch (Exception e) { - String message = "Unable to initialize/generate ECC keys. FORCED SHUTDOWN."; - this.logger.error(message); - throw new InitializationException(message); - } - } - - this.privateKey = privateKey; - this.publicKey = publicKey; - } - else { - this.privateKey = null; - this.publicKey = null; - } - - - this.secureRandom = new SecureRandom(this.propertyStore.getSalt()); this.shutdownHook = new Thread() { @Override public void run() { - // connectionManager.shutdown accurately reflects the state of the app. Safe to use here - if (EndPoint.this.connectionManager != null && !EndPoint.this.connectionManager.shutdown.get()) { + if (EndPoint.this.shouldShutdownHookRun()) { EndPoint.this.stop(); } } @@ -309,89 +124,6 @@ class EndPoint { } catch (Throwable ignored) { // if we are in the middle of shutdown, we cannot do this. } - - - // we don't care about un-instantiated/constructed members, since the class type is the only interest. - this.connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass()); - - // add the ping listener (internal use only!) - this.connectionManager.add(new PingSystemListener()); - - if (this.rmiEnabled) { - // these register the listener for registering a class implementation for RMI (internal use only) - this.connectionManager.add(new RegisterRmiSystemListener()); - this.globalRmiBridge = new RmiBridge(logger, options.rmiExecutor, true); - } - else { - this.globalRmiBridge = null; - } - - serializationManager.finishInit(); - } - - /** - * Disables remote endpoint public key validation when the connection is established. This is not recommended as it is a security risk - */ - public - void disableRemoteKeyValidation() { - Logger logger2 = this.logger; - - if (isConnected()) { - logger2.error("Cannot disable the remote key validation after this endpoint is connected!"); - } - else { - logger2.info("WARNING: Disabling remote key validation is a security risk!!"); - this.disableRemoteKeyValidation = true; - } - } - - /** - * Returns the property store used by this endpoint. The property store can store via properties, - * a database, etc, or can be a "null" property store, which does nothing - */ - @SuppressWarnings("unchecked") - public - S getPropertyStore() { - return (S) this.propertyStore; - } - - /** - * Internal call by the pipeline to notify the client to continue registering the different session protocols. - * The server does not use this. - */ - protected - boolean registerNextProtocol0() { - return true; - } - - /** - * The amount of milli-seconds that must elapse with no read or write before {@link Listener.OnIdle#idle(Connection)} } - * will be triggered - */ - public - int getIdleTimeout() { - return this.idleTimeoutMs; - } - - /** - * The {@link Listener:idle()} will be triggered when neither read nor write - * has happened for the specified period of time (in milli-seconds) - *
- * Specify {@code 0} to disable (default). - */ - public - void setIdleTimeout(int idleTimeoutMs) { - this.idleTimeoutMs = idleTimeoutMs; - } - - /** - * Return the connection status of this endpoint. - *

- * Once a server has connected to ANY client, it will always return true until server.close() is called - */ - public final - boolean isConnected() { - return this.isConnected.get(); } /** @@ -414,155 +146,8 @@ class EndPoint { } } - /** - * Returns the serialization wrapper if there is an object type that needs to be added outside of the basics. - */ - public - dorkbox.network.util.CryptoSerializationManager getSerialization() { - return this.serializationManager; - } - - /** - * This method allows the connections used by the client/server to be subclassed (custom implementations). - *

- * As this is for the network stack, the new connection MUST subclass {@link ConnectionImpl} - *

- * The parameters are ALL NULL when getting the base class, as this instance is just thrown away. - * - * @return a new network connection - */ - protected - ConnectionImpl newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { - return new ConnectionImpl(logger, endPoint, rmiBridge); - } - - /** - * Internal call by the pipeline when: - * - creating a new network connection - * - when determining the baseClass for listeners - * - * @param metaChannel can be NULL (when getting the baseClass) - */ - @SuppressWarnings("unchecked") - protected final - Connection connection0(MetaChannel metaChannel) { - ConnectionImpl connection; - - RmiBridge rmiBridge = null; - if (metaChannel != null && rmiEnabled) { - rmiBridge = new RmiBridge(logger, rmiExecutor, false); - } - - // setup the extras needed by the network connection. - // These properties are ASSIGNED in the same thread that CREATED the object. Only the AES info needs to be - // volatile since it is the only thing that changes. - if (metaChannel != null) { - ChannelWrapper wrapper; - - connection = newConnection(logger, this, rmiBridge); - metaChannel.connection = connection; - - if (metaChannel.localChannel != null) { - wrapper = new ChannelLocalWrapper(metaChannel); - } - else { - if (this instanceof EndPointServer) { - wrapper = new ChannelNetworkWrapper(metaChannel, this.registrationWrapper); - } - else { - wrapper = new ChannelNetworkWrapper(metaChannel, null); - } - } - - // now initialize the connection channels with whatever extra info they might need. - connection.init(wrapper, (ConnectionManager) this.connectionManager); - - if (rmiBridge != null) { - // notify our remote object space that it is able to receive method calls. - connection.listeners() - .add(rmiBridge.getListener()); - } - } - else { - // getting the connection baseClass - - // have to add the networkAssociate to a map of "connected" computers - connection = newConnection(null, null, null); - } - - return connection; - } - - /** - * Internal call by the pipeline to notify the "Connection" object that it has "connected", meaning that modifications - * to the pipeline are finished. - *

- * Only the CLIENT injects in front of this) - */ - @SuppressWarnings("unchecked") - void connectionConnected0(ConnectionImpl connection) { - this.isConnected.set(true); - - // prep the channel wrapper - connection.prep(); - - this.connectionManager.onConnected((C) connection); - } - - /** - * Expose methods to modify the listeners (connect/disconnect/idle/receive events). - */ - public final - Listeners listeners() { - return this.connectionManager; - } - - /** - * Returns a non-modifiable list of active connections - */ - public - List getConnections() { - return this.connectionManager.getConnections(); - } - - /** - * Returns a non-modifiable list of active connections - */ - @SuppressWarnings("unchecked") - public - Collection getConnectionsAs() { - return this.connectionManager.getConnections(); - } - - /** - * Expose methods to send objects to a destination. - */ - public abstract - ConnectionBridgeBase send(); - - /** - * Closes all connections ONLY (keeps the server/client running). To STOP the client/server, use stop(). - *

- * This is used, for example, when reconnecting to a server. - *

- * The server should ALWAYS use STOP. - */ - public - void closeConnections() { - // give a chance to other threads. - Thread.yield(); - - // stop does the same as this + more - this.connectionManager.closeConnections(); - - // Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed. - this.registrationWrapper.closeChannels(maxShutdownWaitTimeInMilliSeconds); - - this.isConnected.set(false); - } - // server only does this on stop. Client does this on closeConnections - protected void shutdownChannels() { + void shutdownChannels() { synchronized (shutdownChannelList) { // now we stop all of our channels for (ChannelFuture f : this.shutdownChannelList) { @@ -579,6 +164,7 @@ class EndPoint { } } + protected final String stopWithErrorMessage(Logger logger2, String errorMessage, Throwable throwable) { if (logger2.isDebugEnabled() && throwable != null) { @@ -593,6 +179,16 @@ class EndPoint { return errorMessage; } + /** + * Starts the shutdown process during JVM shutdown, if necessary. + *

+ * By default, we always can shutdown via the JVM shutdown hook. + */ + protected + boolean shouldShutdownHookRun() { + return true; + } + /** * Safely closes all associated resources/threads/connections. *

@@ -653,6 +249,28 @@ class EndPoint { } } + /** + * Extra EXTERNAL actions to perform when stopping this endpoint. + */ + protected + void stopExtraActions() { + } + + /** + * Actions that happen by the endpoint before the channels are shutdown + */ + protected + void shutdownChannelsPre() { + } + + + /** + * Actions that happen by the endpoint before any extra actions are run. + */ + protected + void stopExtraActionsInternal() { + } + // This actually does the "stopping", since there is some logic to making sure we don't deadlock, this is important private void stopInThread() { @@ -676,14 +294,11 @@ class EndPoint { Thread.yield(); } - closeConnections(); - - // this does a closeConnections + clear_listeners - this.connectionManager.stop(); + shutdownChannelsPre(); shutdownChannels(); - this.logger.info("Stopping endpoint"); + this.logger.info("Stopping endpoint."); // there is no need to call "stop" again if we close the connection. // however, if this is called WHILE from the shutdown hook, blammo! problems! @@ -701,10 +316,7 @@ class EndPoint { } } - - // shutdown the database store - this.propertyStore.close(); - + stopExtraActionsInternal(); // when the eventloop closes, the associated selectors are ALSO closed! stopExtraActions(); @@ -717,13 +329,6 @@ class EndPoint { this.blockUntilDone.countDown(); } - /** - * Extra EXTERNAL actions to perform when stopping this endpoint. - */ - public - void stopExtraActions() { - } - /** * Blocks the current thread until the endpoint has been stopped. If the endpoint is already stopped, this do nothing. */ @@ -737,50 +342,6 @@ class EndPoint { } } - @Override - public - int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (this.privateKey == null ? 0 : this.privateKey.hashCode()); - result = prime * result + (this.publicKey == null ? 0 : this.publicKey.hashCode()); - return result; - } - - @SuppressWarnings("rawtypes") - @Override - public - boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - EndPoint other = (EndPoint) obj; - - if (this.privateKey == null) { - if (other.privateKey != null) { - return false; - } - } - else if (!CryptoECC.compare(this.privateKey, other.privateKey)) { - return false; - } - if (this.publicKey == null) { - if (other.publicKey != null) { - return false; - } - } - else if (!CryptoECC.compare(this.publicKey, other.publicKey)) { - return false; - } - return true; - } - @Override public String toString() { @@ -791,15 +352,4 @@ class EndPoint { String getName() { return this.type.getSimpleName(); } - - /** - * Creates a "global" RMI object for use by multiple connections. - * @return the ID assigned to this RMI object - */ - public - int createGlobalObject(final T globalObject) { - int globalObjectId = globalRmiBridge.nextObjectId(); - globalRmiBridge.register(globalObjectId, globalObject); - return globalObjectId; - } } diff --git a/src/dorkbox/network/connection/EndPointBase.java b/src/dorkbox/network/connection/EndPointBase.java new file mode 100644 index 00000000..9706b084 --- /dev/null +++ b/src/dorkbox/network/connection/EndPointBase.java @@ -0,0 +1,523 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection; + +import java.io.IOException; +import java.security.SecureRandom; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.bouncycastle.crypto.AsymmetricCipherKeyPair; +import org.bouncycastle.crypto.params.ECPrivateKeyParameters; +import org.bouncycastle.crypto.params.ECPublicKeyParameters; +import org.slf4j.Logger; + +import dorkbox.network.Configuration; +import dorkbox.network.connection.bridge.ConnectionBridgeBase; +import dorkbox.network.connection.ping.PingSystemListener; +import dorkbox.network.connection.registration.MetaChannel; +import dorkbox.network.connection.wrapper.ChannelLocalWrapper; +import dorkbox.network.connection.wrapper.ChannelNetworkWrapper; +import dorkbox.network.connection.wrapper.ChannelWrapper; +import dorkbox.network.pipeline.KryoEncoder; +import dorkbox.network.pipeline.KryoEncoderCrypto; +import dorkbox.network.rmi.RmiBridge; +import dorkbox.network.store.NullSettingsStore; +import dorkbox.network.store.SettingsStore; +import dorkbox.util.Property; +import dorkbox.util.crypto.CryptoECC; +import dorkbox.util.entropy.Entropy; +import dorkbox.util.exceptions.InitializationException; +import dorkbox.util.exceptions.SecurityException; +import io.netty.util.NetUtil; + +/** + * represents the base of a client/server end point + */ +public abstract +class EndPointBase extends EndPoint { + // If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets! + // it results in severe UDP packet loss and contention. + // + // http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM + // also, a google search on just "INET97/proceedings/F3/F3_1.HTM" turns up interesting problems. + // Usually it's with ISPs. + + // TODO: will also want an UDP keepalive? (TCP is already there b/c of socket options, but might need a heartbeat to detect dead connections?) + // routers sometimes need a heartbeat to keep the connection + // TODO: maybe some sort of STUN-like connection keep-alive?? + + + public static final String LOCAL_CHANNEL = "local_channel"; + + /** + * The default size for UDP packets is 768 bytes. + *

+ * You could increase or decrease this value to avoid truncated packets + * or to improve memory footprint respectively. + *

+ * Please also note that a large UDP packet might be truncated or + * dropped by your router no matter how you configured this option. + * In UDP, a packet is truncated or dropped if it is larger than a + * certain size, depending on router configuration. IPv4 routers + * truncate and IPv6 routers drop a large packet. That's why it is + * safe to send small packets in UDP. + *

+ * To fit into that magic 576-byte MTU and avoid fragmentation, your + * UDP payload should be restricted by 576-60-8=508 bytes. + * + * This can be set higher on an internal lan! + * + * DON'T go higher that 1400 over the internet, but 9k is possible + * with jumbo frames on a local network (if it's supported) + */ + @Property + public static int udpMaxSize = 508; + + protected final ConnectionManager connectionManager; + protected final dorkbox.network.util.CryptoSerializationManager serializationManager; + protected final RegistrationWrapper registrationWrapper; + + final ECPrivateKeyParameters privateKey; + final ECPublicKeyParameters publicKey; + + final SecureRandom secureRandom; + final RmiBridge globalRmiBridge; + + private final Executor rmiExecutor; + private final boolean rmiEnabled; + + SettingsStore propertyStore; + boolean disableRemoteKeyValidation; + + /** + * in milliseconds. default is disabled! + */ + private volatile int idleTimeoutMs = 0; + + private AtomicBoolean isConnected = new AtomicBoolean(false); + + + /** + * @param type this is either "Client" or "Server", depending on who is creating this endpoint. + * @param options these are the specific connection options + * + * @throws InitializationException + * @throws SecurityException + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public + EndPointBase(Class type, final Configuration options) throws InitializationException, SecurityException, IOException { + super(type); + + // make sure that 'localhost' is ALWAYS our specific loopback IP address + if (options.host != null && (options.host.equals("localhost") || options.host.startsWith("127."))) { + // localhost IP might not always be 127.0.0.1 + options.host = NetUtil.LOCALHOST.getHostAddress(); + } + + // serialization stuff + if (options.serialization != null) { + this.serializationManager = options.serialization; + } else { + this.serializationManager = CryptoSerializationManager.DEFAULT(); + } + + // setup our RMI serialization managers. Can only be called once + rmiEnabled = serializationManager.initRmiSerialization(); + rmiExecutor = options.rmiExecutor; + + + // The registration wrapper permits the registration process to access protected/package fields/methods, that we don't want + // to expose to external code. "this" escaping can be ignored, because it is benign. + //noinspection ThisEscapedInObjectConstruction + this.registrationWrapper = new RegistrationWrapper(this, + this.logger, + new KryoEncoder(this.serializationManager), + new KryoEncoderCrypto(this.serializationManager)); + + + // we have to be able to specify WHAT property store we want to use, since it can change! + if (options.settingsStore == null) { + this.propertyStore = new PropertyStore(); + } + else { + this.propertyStore = options.settingsStore; + } + + this.propertyStore.init(this.serializationManager, null); + + // null it out, since it is sensitive! + options.settingsStore = null; + + + if (!(this.propertyStore instanceof NullSettingsStore)) { + // initialize the private/public keys used for negotiating ECC handshakes + // these are ONLY used for IP connections. LOCAL connections do not need a handshake! + ECPrivateKeyParameters privateKey = this.propertyStore.getPrivateKey(); + ECPublicKeyParameters publicKey = this.propertyStore.getPublicKey(); + + if (privateKey == null || publicKey == null) { + try { + // seed our RNG based off of this and create our ECC keys + byte[] seedBytes = Entropy.get("There are no ECC keys for the " + type.getSimpleName() + " yet"); + SecureRandom secureRandom = new SecureRandom(seedBytes); + secureRandom.nextBytes(seedBytes); + + this.logger.debug("Now generating ECC (" + CryptoECC.curve25519 + ") keys. Please wait!"); + AsymmetricCipherKeyPair generateKeyPair = CryptoECC.generateKeyPair(CryptoECC.curve25519, secureRandom); + + privateKey = (ECPrivateKeyParameters) generateKeyPair.getPrivate(); + publicKey = (ECPublicKeyParameters) generateKeyPair.getPublic(); + + // save to properties file + this.propertyStore.savePrivateKey(privateKey); + this.propertyStore.savePublicKey(publicKey); + + this.logger.debug("Done with ECC keys!"); + } catch (Exception e) { + String message = "Unable to initialize/generate ECC keys. FORCED SHUTDOWN."; + this.logger.error(message); + throw new InitializationException(message); + } + } + + this.privateKey = privateKey; + this.publicKey = publicKey; + } + else { + this.privateKey = null; + this.publicKey = null; + } + + + this.secureRandom = new SecureRandom(this.propertyStore.getSalt()); + + // we don't care about un-instantiated/constructed members, since the class type is the only interest. + this.connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null).getClass()); + + // add the ping listener (internal use only!) + this.connectionManager.add(new PingSystemListener()); + + if (this.rmiEnabled) { + // these register the listener for registering a class implementation for RMI (internal use only) + this.connectionManager.add(new RegisterRmiSystemListener()); + this.globalRmiBridge = new RmiBridge(logger, options.rmiExecutor, true); + } + else { + this.globalRmiBridge = null; + } + + serializationManager.finishInit(); + } + + /** + * Disables remote endpoint public key validation when the connection is established. This is not recommended as it is a security risk + */ + public + void disableRemoteKeyValidation() { + Logger logger2 = this.logger; + + if (isConnected()) { + logger2.error("Cannot disable the remote key validation after this endpoint is connected!"); + } + else { + logger2.info("WARNING: Disabling remote key validation is a security risk!!"); + this.disableRemoteKeyValidation = true; + } + } + + /** + * Returns the property store used by this endpoint. The property store can store via properties, + * a database, etc, or can be a "null" property store, which does nothing + */ + @SuppressWarnings("unchecked") + public + S getPropertyStore() { + return (S) this.propertyStore; + } + + /** + * Internal call by the pipeline to notify the client to continue registering the different session protocols. + * The server does not use this. + */ + protected + boolean registerNextProtocol0() { + return true; + } + + /** + * The amount of milli-seconds that must elapse with no read or write before {@link Listener.OnIdle#idle(Connection)} } + * will be triggered + */ + public + int getIdleTimeout() { + return this.idleTimeoutMs; + } + + /** + * The {@link Listener:idle()} will be triggered when neither read nor write + * has happened for the specified period of time (in milli-seconds) + *
+ * Specify {@code 0} to disable (default). + */ + public + void setIdleTimeout(int idleTimeoutMs) { + this.idleTimeoutMs = idleTimeoutMs; + } + + /** + * Return the connection status of this endpoint. + *

+ * Once a server has connected to ANY client, it will always return true until server.close() is called + */ + public final + boolean isConnected() { + return this.isConnected.get(); + } + + /** + * Returns the serialization wrapper if there is an object type that needs to be added outside of the basics. + */ + public + dorkbox.network.util.CryptoSerializationManager getSerialization() { + return this.serializationManager; + } + + /** + * This method allows the connections used by the client/server to be subclassed (custom implementations). + *

+ * As this is for the network stack, the new connection MUST subclass {@link ConnectionImpl} + *

+ * The parameters are ALL NULL when getting the base class, as this instance is just thrown away. + * + * @return a new network connection + */ + protected + ConnectionImpl newConnection(final Logger logger, final EndPointBase endPointBaseConnection, final RmiBridge rmiBridge) { + return new ConnectionImpl(logger, endPointBaseConnection, rmiBridge); + } + + /** + * Internal call by the pipeline when: + * - creating a new network connection + * - when determining the baseClass for listeners + * + * @param metaChannel can be NULL (when getting the baseClass) + */ + @SuppressWarnings("unchecked") + protected final + Connection connection0(MetaChannel metaChannel) { + ConnectionImpl connection; + + RmiBridge rmiBridge = null; + if (metaChannel != null && rmiEnabled) { + rmiBridge = new RmiBridge(logger, rmiExecutor, false); + } + + // setup the extras needed by the network connection. + // These properties are ASSIGNED in the same thread that CREATED the object. Only the AES info needs to be + // volatile since it is the only thing that changes. + if (metaChannel != null) { + ChannelWrapper wrapper; + + connection = newConnection(logger, this, rmiBridge); + metaChannel.connection = connection; + + if (metaChannel.localChannel != null) { + wrapper = new ChannelLocalWrapper(metaChannel); + } + else { + if (this instanceof EndPointServer) { + wrapper = new ChannelNetworkWrapper(metaChannel, this.registrationWrapper); + } + else { + wrapper = new ChannelNetworkWrapper(metaChannel, null); + } + } + + // now initialize the connection channels with whatever extra info they might need. + connection.init(wrapper, (ConnectionManager) this.connectionManager); + + if (rmiBridge != null) { + // notify our remote object space that it is able to receive method calls. + connection.listeners() + .add(rmiBridge.getListener()); + } + } + else { + // getting the connection baseClass + + // have to add the networkAssociate to a map of "connected" computers + connection = newConnection(null, null, null); + } + + return connection; + } + + /** + * Internal call by the pipeline to notify the "Connection" object that it has "connected", meaning that modifications + * to the pipeline are finished. + *

+ * Only the CLIENT injects in front of this) + */ + @SuppressWarnings("unchecked") + void connectionConnected0(ConnectionImpl connection) { + this.isConnected.set(true); + + // prep the channel wrapper + connection.prep(); + + this.connectionManager.onConnected((C) connection); + } + + /** + * Expose methods to modify the listeners (connect/disconnect/idle/receive events). + */ + public final + Listeners listeners() { + return this.connectionManager; + } + + /** + * Returns a non-modifiable list of active connections + */ + public + List getConnections() { + return this.connectionManager.getConnections(); + } + + /** + * Returns a non-modifiable list of active connections + */ + @SuppressWarnings("unchecked") + public + Collection getConnectionsAs() { + return this.connectionManager.getConnections(); + } + + /** + * Expose methods to send objects to a destination. + */ + public abstract + ConnectionBridgeBase send(); + + /** + * Closes all connections ONLY (keeps the server/client running). To STOP the client/server, use stop(). + *

+ * This is used, for example, when reconnecting to a server. + *

+ * The server should ALWAYS use STOP. + */ + public + void closeConnections() { + // give a chance to other threads. + Thread.yield(); + + // stop does the same as this + more + this.connectionManager.closeConnections(); + + // Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed. + this.registrationWrapper.closeChannels(maxShutdownWaitTimeInMilliSeconds); + + this.isConnected.set(false); + } + + /** + * Starts the shutdown process during JVM shutdown, if necessary. + *

+ * By default, we always can shutdown via the JVM shutdown hook. + */ + @Override + protected + boolean shouldShutdownHookRun() { + // connectionManager.shutdown accurately reflects the state of the app. Safe to use here + return (this.connectionManager != null && !this.connectionManager.shutdown.get()); + } + + @Override + protected + void shutdownChannelsPre() { + closeConnections(); + + // this does a closeConnections + clear_listeners + this.connectionManager.stop(); + } + + @Override + protected + void stopExtraActionsInternal() { + // shutdown the database store + this.propertyStore.close(); + } + + @Override + public + int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (this.privateKey == null ? 0 : this.privateKey.hashCode()); + result = prime * result + (this.publicKey == null ? 0 : this.publicKey.hashCode()); + return result; + } + + @SuppressWarnings("rawtypes") + @Override + public + boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + EndPointBase other = (EndPointBase) obj; + + if (this.privateKey == null) { + if (other.privateKey != null) { + return false; + } + } + else if (!CryptoECC.compare(this.privateKey, other.privateKey)) { + return false; + } + if (this.publicKey == null) { + if (other.publicKey != null) { + return false; + } + } + else if (!CryptoECC.compare(this.publicKey, other.publicKey)) { + return false; + } + return true; + } + + /** + * Creates a "global" RMI object for use by multiple connections. + * @return the ID assigned to this RMI object + */ + public + int createGlobalObject(final T globalObject) { + int globalObjectId = globalRmiBridge.nextObjectId(); + globalRmiBridge.register(globalObjectId, globalObject); + return globalObjectId; + } +} diff --git a/src/dorkbox/network/connection/EndPointClient.java b/src/dorkbox/network/connection/EndPointClient.java index a048a87d..0a92cee6 100644 --- a/src/dorkbox/network/connection/EndPointClient.java +++ b/src/dorkbox/network/connection/EndPointClient.java @@ -34,7 +34,7 @@ import io.netty.channel.ChannelOption; * This serves the purpose of making sure that specific methods are not available to the end user. */ public -class EndPointClient extends EndPoint implements Runnable { +class EndPointClient extends EndPointBase implements Runnable { protected C connection; diff --git a/src/dorkbox/network/connection/EndPointServer.java b/src/dorkbox/network/connection/EndPointServer.java index d2154ef9..0ba54a19 100644 --- a/src/dorkbox/network/connection/EndPointServer.java +++ b/src/dorkbox/network/connection/EndPointServer.java @@ -27,7 +27,7 @@ import dorkbox.util.exceptions.SecurityException; * This serves the purpose of making sure that specific methods are not available to the end user. */ public -class EndPointServer extends EndPoint { +class EndPointServer extends EndPointBase { public EndPointServer(final Configuration options) throws InitializationException, SecurityException, IOException { diff --git a/src/dorkbox/network/connection/KryoExtra.java b/src/dorkbox/network/connection/KryoExtra.java index e79852a2..ef897d0b 100644 --- a/src/dorkbox/network/connection/KryoExtra.java +++ b/src/dorkbox/network/connection/KryoExtra.java @@ -61,7 +61,7 @@ class KryoExtra extends Kryo { // writing data - private final ByteBuf tempBuffer = Unpooled.buffer(EndPoint.udpMaxSize); + private final ByteBuf tempBuffer = Unpooled.buffer(EndPointBase.udpMaxSize); private LZ4Compressor compressor = factory.fastCompressor(); private int inputArrayLength = -1; diff --git a/src/dorkbox/network/connection/Listener.java b/src/dorkbox/network/connection/Listener.java index 2ae7054c..1abc86fc 100644 --- a/src/dorkbox/network/connection/Listener.java +++ b/src/dorkbox/network/connection/Listener.java @@ -62,12 +62,12 @@ interface Listener { /** - * Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold. + * Called when the connection is idle for longer than the {@link EndPointBase#setIdleTimeout(int)} idle threshold. */ interface OnIdle extends Listener { /** - * Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold. + * Called when the connection is idle for longer than the {@link EndPointBase#setIdleTimeout(int)} idle threshold. */ void idle(C connection) throws IOException; } diff --git a/src/dorkbox/network/connection/PropertyStore.java b/src/dorkbox/network/connection/PropertyStore.java index 9d71be38..6474de3b 100644 --- a/src/dorkbox/network/connection/PropertyStore.java +++ b/src/dorkbox/network/connection/PropertyStore.java @@ -80,7 +80,7 @@ class PropertyStore extends SettingsStore { @Override public synchronized ECPrivateKeyParameters getPrivateKey() throws dorkbox.util.exceptions.SecurityException { - checkAccess(EndPoint.class); + checkAccess(EndPointBase.class); return servers.get(DB_Server.IP_SELF) .getPrivateKey(); @@ -92,7 +92,7 @@ class PropertyStore extends SettingsStore { @Override public synchronized void savePrivateKey(final ECPrivateKeyParameters serverPrivateKey) throws SecurityException { - checkAccess(EndPoint.class); + checkAccess(EndPointBase.class); servers.get(DB_Server.IP_SELF) .setPrivateKey(serverPrivateKey); @@ -107,7 +107,7 @@ class PropertyStore extends SettingsStore { @Override public synchronized ECPublicKeyParameters getPublicKey() throws SecurityException { - checkAccess(EndPoint.class); + checkAccess(EndPointBase.class); return servers.get(DB_Server.IP_SELF) .getPublicKey(); @@ -119,7 +119,7 @@ class PropertyStore extends SettingsStore { @Override public synchronized void savePublicKey(final ECPublicKeyParameters serverPublicKey) throws SecurityException { - checkAccess(EndPoint.class); + checkAccess(EndPointBase.class); servers.get(DB_Server.IP_SELF) .setPublicKey(serverPublicKey); diff --git a/src/dorkbox/network/connection/RegistrationWrapper.java b/src/dorkbox/network/connection/RegistrationWrapper.java index 7e887b00..e7d9247e 100644 --- a/src/dorkbox/network/connection/RegistrationWrapper.java +++ b/src/dorkbox/network/connection/RegistrationWrapper.java @@ -53,7 +53,7 @@ class RegistrationWrapper implements UdpServer { private final KryoEncoder kryoEncoder; private final KryoEncoderCrypto kryoEncoderCrypto; - private final EndPoint endPoint; + private final EndPointBase endPointBaseConnection; // keeps track of connections (TCP/UDP-client) private final ReentrantLock channelMapLock = new ReentrantLock(); @@ -77,16 +77,16 @@ class RegistrationWrapper implements UdpServer { public - RegistrationWrapper(final EndPoint endPoint, + RegistrationWrapper(final EndPointBase endPointBaseConnection, final Logger logger, final KryoEncoder kryoEncoder, final KryoEncoderCrypto kryoEncoderCrypto) { - this.endPoint = endPoint; + this.endPointBaseConnection = endPointBaseConnection; this.logger = logger; this.kryoEncoder = kryoEncoder; this.kryoEncoderCrypto = kryoEncoderCrypto; - if (endPoint instanceof EndPointServer) { + if (endPointBaseConnection instanceof EndPointServer) { this.udpRemoteMap = new ObjectMap(32, ConnectionManager.LOAD_FACTOR); } else { @@ -99,7 +99,7 @@ class RegistrationWrapper implements UdpServer { */ public boolean rmiEnabled() { - return endPoint.globalRmiBridge != null; + return endPointBaseConnection.globalRmiBridge != null; } public @@ -135,7 +135,7 @@ class RegistrationWrapper implements UdpServer { */ public int getIdleTimeout() { - return this.endPoint.getIdleTimeout(); + return this.endPointBaseConnection.getIdleTimeout(); } /** @@ -146,7 +146,7 @@ class RegistrationWrapper implements UdpServer { */ public boolean registerNextProtocol0() { - return this.endPoint.registerNextProtocol0(); + return this.endPointBaseConnection.registerNextProtocol0(); } /** @@ -155,7 +155,7 @@ class RegistrationWrapper implements UdpServer { */ public void connectionConnected0(ConnectionImpl networkConnection) { - this.endPoint.connectionConnected0(networkConnection); + this.endPointBaseConnection.connectionConnected0(networkConnection); } /** @@ -166,22 +166,22 @@ class RegistrationWrapper implements UdpServer { */ public Connection connection0(MetaChannel metaChannel) { - return this.endPoint.connection0(metaChannel); + return this.endPointBaseConnection.connection0(metaChannel); } public SecureRandom getSecureRandom() { - return this.endPoint.secureRandom; + return this.endPointBaseConnection.secureRandom; } public ECPublicKeyParameters getPublicKey() { - return this.endPoint.publicKey; + return this.endPointBaseConnection.publicKey; } public CipherParameters getPrivateKey() { - return this.endPoint.privateKey; + return this.endPointBaseConnection.privateKey; } @@ -197,13 +197,13 @@ class RegistrationWrapper implements UdpServer { InetAddress address = tcpRemoteServer.getAddress(); byte[] hostAddress = address.getAddress(); - ECPublicKeyParameters savedPublicKey = this.endPoint.propertyStore.getRegisteredServerKey(hostAddress); + ECPublicKeyParameters savedPublicKey = this.endPointBaseConnection.propertyStore.getRegisteredServerKey(hostAddress); Logger logger2 = this.logger; if (savedPublicKey == null) { if (logger2.isDebugEnabled()) { logger2.debug("Adding new remote IP address key for {}", address.getHostAddress()); } - this.endPoint.propertyStore.addRegisteredServerKey(hostAddress, publicKey); + this.endPointBaseConnection.propertyStore.addRegisteredServerKey(hostAddress, publicKey); } else { // COMPARE! @@ -216,7 +216,7 @@ class RegistrationWrapper implements UdpServer { byAddress = "Unknown Address"; } - if (this.endPoint.disableRemoteKeyValidation) { + if (this.endPointBaseConnection.disableRemoteKeyValidation) { logger2.warn("Invalid or non-matching public key from remote server. Their public key has changed. To fix, remove entry for: {}", byAddress); return true; } @@ -234,7 +234,7 @@ class RegistrationWrapper implements UdpServer { @SuppressWarnings("AutoBoxing") public void removeRegisteredServerKey(final byte[] hostAddress) throws SecurityException { - ECPublicKeyParameters savedPublicKey = this.endPoint.propertyStore.getRegisteredServerKey(hostAddress); + ECPublicKeyParameters savedPublicKey = this.endPointBaseConnection.propertyStore.getRegisteredServerKey(hostAddress); if (savedPublicKey != null) { Logger logger2 = this.logger; if (logger2.isDebugEnabled()) { @@ -244,7 +244,7 @@ class RegistrationWrapper implements UdpServer { hostAddress[2], hostAddress[3]); } - this.endPoint.propertyStore.removeRegisteredServerKey(hostAddress); + this.endPointBaseConnection.propertyStore.removeRegisteredServerKey(hostAddress); } } @@ -316,8 +316,8 @@ class RegistrationWrapper implements UdpServer { public void abortRegistrationIfClient() { - if (this.endPoint instanceof EndPointClient) { - ((EndPointClient) this.endPoint).abortRegistration(); + if (this.endPointBaseConnection instanceof EndPointClient) { + ((EndPointClient) this.endPointBaseConnection).abortRegistration(); } } diff --git a/src/dorkbox/network/connection/registration/RegistrationHandler.java b/src/dorkbox/network/connection/registration/RegistrationHandler.java index 29811968..e303c4fc 100644 --- a/src/dorkbox/network/connection/registration/RegistrationHandler.java +++ b/src/dorkbox/network/connection/registration/RegistrationHandler.java @@ -16,7 +16,7 @@ package dorkbox.network.connection.registration; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.RegistrationWrapper; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; @@ -94,7 +94,7 @@ class RegistrationHandler extends ChannelInboundHandlerAda // also, once we notify, we unregister this. if (registrationWrapper != null) { - MetaChannel metaChannel = registrationWrapper.closeChannel(channel, EndPoint.maxShutdownWaitTimeInMilliSeconds); + MetaChannel metaChannel = registrationWrapper.closeChannel(channel, EndPointBase.maxShutdownWaitTimeInMilliSeconds); registrationWrapper.abortRegistrationIfClient(); return metaChannel; diff --git a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java index 43c88b47..28faf11a 100644 --- a/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java +++ b/src/dorkbox/network/connection/registration/local/RegistrationLocalHandler.java @@ -15,6 +15,10 @@ */ package dorkbox.network.connection.registration.local; +import static dorkbox.network.connection.EndPointBase.maxShutdownWaitTimeInMilliSeconds; + +import org.slf4j.Logger; + import dorkbox.network.connection.Connection; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; @@ -23,9 +27,6 @@ import dorkbox.network.pipeline.LocalRmiDecoder; import dorkbox.network.pipeline.LocalRmiEncoder; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; - -import static dorkbox.network.connection.EndPoint.maxShutdownWaitTimeInMilliSeconds; public abstract class RegistrationLocalHandler extends RegistrationHandler { diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index 20c95625..9c1a3270 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -15,7 +15,7 @@ */ package dorkbox.network.connection.registration.remote; -import static dorkbox.network.connection.EndPoint.maxShutdownWaitTimeInMilliSeconds; +import static dorkbox.network.connection.EndPointBase.maxShutdownWaitTimeInMilliSeconds; import java.net.InetAddress; import java.net.InetSocketAddress; diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java index d48d4071..fedc18ac 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServerUDP.java @@ -26,7 +26,7 @@ import dorkbox.network.Broadcast; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.CryptoSerializationManager; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.RegistrationWrapper; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; @@ -266,7 +266,7 @@ class RegistrationRemoteHandlerServerUDP extends MessageTo // also, once we notify, we unregister this. if (registrationWrapper != null) { - return registrationWrapper.closeChannel(channel, EndPoint.maxShutdownWaitTimeInMilliSeconds); + return registrationWrapper.closeChannel(channel, EndPointBase.maxShutdownWaitTimeInMilliSeconds); } return null; diff --git a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java index 262f8995..2744f2f1 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java @@ -21,7 +21,7 @@ import org.bouncycastle.crypto.params.ParametersWithIV; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionPointWriter; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.ISessionManager; import dorkbox.network.connection.registration.MetaChannel; import io.netty.channel.Channel; @@ -120,7 +120,7 @@ class ChannelLocalWrapper implements ChannelWrapper, Co @Override public void close(Connection connection, ISessionManager sessionManager) { - long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; + long maxShutdownWaitTimeInMilliSeconds = EndPointBase.maxShutdownWaitTimeInMilliSeconds; this.shouldFlush.set(false); diff --git a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java index 68a26de7..7efed5b8 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java @@ -22,7 +22,7 @@ import org.bouncycastle.crypto.params.ParametersWithIV; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionPointWriter; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.ISessionManager; import dorkbox.network.connection.UdpServer; import dorkbox.network.connection.registration.MetaChannel; @@ -166,7 +166,7 @@ class ChannelNetworkWrapper implements ChannelWrapper { @Override public void close(final Connection connection, final ISessionManager sessionManager) { - long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds; + long maxShutdownWaitTimeInMilliSeconds = EndPointBase.maxShutdownWaitTimeInMilliSeconds; this.tcp.close(maxShutdownWaitTimeInMilliSeconds); diff --git a/src/dorkbox/network/pipeline/LocalRmiEncoder.java b/src/dorkbox/network/pipeline/LocalRmiEncoder.java index c006c0f8..c970e221 100644 --- a/src/dorkbox/network/pipeline/LocalRmiEncoder.java +++ b/src/dorkbox/network/pipeline/LocalRmiEncoder.java @@ -22,7 +22,7 @@ import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.rmi.Rmi; import dorkbox.util.FastThreadLocal; import io.netty.channel.ChannelHandler.Sharable; @@ -33,7 +33,7 @@ import io.netty.handler.codec.MessageToMessageEncoder; public class LocalRmiEncoder extends MessageToMessageEncoder { - private static final Map, Boolean> transformObjectCache = new ConcurrentHashMap, Boolean>(EndPoint.DEFAULT_THREAD_POOL_SIZE); + private static final Map, Boolean> transformObjectCache = new ConcurrentHashMap, Boolean>(EndPointBase.DEFAULT_THREAD_POOL_SIZE); private static final RmiFieldCache fieldCache = RmiFieldCache.INSTANCE(); private final FastThreadLocal> objectThreadLocals = new FastThreadLocal>() { diff --git a/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java b/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java index e9a9cd97..e79e38d1 100644 --- a/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java +++ b/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java @@ -15,7 +15,13 @@ */ package dorkbox.network.pipeline.udp; -import dorkbox.network.connection.EndPoint; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +import org.slf4j.LoggerFactory; + +import dorkbox.network.connection.EndPointBase; import dorkbox.network.util.CryptoSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -23,11 +29,6 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageEncoder; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; @Sharable // UDP uses messages --- NOT bytebuf! @@ -35,7 +36,7 @@ import java.util.List; public class KryoEncoderUdp extends MessageToMessageEncoder { - private static final int maxSize = EndPoint.udpMaxSize; + private static final int maxSize = EndPointBase.udpMaxSize; private final CryptoSerializationManager serializationManager; diff --git a/src/dorkbox/network/rmi/CachedMethod.java b/src/dorkbox/network/rmi/CachedMethod.java index 8598f5af..6d8ae272 100644 --- a/src/dorkbox/network/rmi/CachedMethod.java +++ b/src/dorkbox/network/rmi/CachedMethod.java @@ -53,7 +53,7 @@ import com.esotericsoftware.kryo.util.Util; import com.esotericsoftware.reflectasm.MethodAccess; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.KryoExtra; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.network.util.RmiSerializationManager; @@ -101,7 +101,7 @@ class CachedMethod { }; // the purpose of the method cache, is to accelerate looking up methods for specific class - private static final Map, CachedMethod[]> methodCache = new ConcurrentHashMap, CachedMethod[]>(EndPoint.DEFAULT_THREAD_POOL_SIZE); + private static final Map, CachedMethod[]> methodCache = new ConcurrentHashMap, CachedMethod[]>(EndPointBase.DEFAULT_THREAD_POOL_SIZE); /** diff --git a/src/dorkbox/network/rmi/RmiBridge.java b/src/dorkbox/network/rmi/RmiBridge.java index 432df7f4..3f074e7a 100644 --- a/src/dorkbox/network/rmi/RmiBridge.java +++ b/src/dorkbox/network/rmi/RmiBridge.java @@ -49,7 +49,7 @@ import com.esotericsoftware.kryo.util.IntMap; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.Listener; import dorkbox.network.util.RmiSerializationManager; import dorkbox.util.collections.ObjectIntMap; @@ -197,7 +197,7 @@ class RmiBridge { /** * Invokes the method on the object and, if necessary, sends the result back to the connection that made the invocation request. This - * method is invoked on the update thread of the {@link EndPoint} for this RmiBridge and unless an executor has been set. + * method is invoked on the update thread of the {@link EndPointBase} for this RmiBridge and unless an executor has been set. * * @param connection * The remote side of this connection requested the invocation. diff --git a/src/dorkbox/network/rmi/RmiProxyHandler.java b/src/dorkbox/network/rmi/RmiProxyHandler.java index d8a309bf..107673e4 100644 --- a/src/dorkbox/network/rmi/RmiProxyHandler.java +++ b/src/dorkbox/network/rmi/RmiProxyHandler.java @@ -47,7 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.util.RmiSerializationManager; /** @@ -195,8 +195,8 @@ class RmiProxyHandler implements InvocationHandler { return proxyString; } - EndPoint endPoint = this.connection.getEndPoint(); - final RmiSerializationManager serializationManager = endPoint.getSerialization(); + EndPointBase endPointBaseConnection = this.connection.getEndPoint(); + final RmiSerializationManager serializationManager = endPointBaseConnection.getSerialization(); InvokeMethod invokeMethod = new InvokeMethod(); invokeMethod.objectID = this.objectID; diff --git a/test/dorkbox/network/BaseTest.java b/test/dorkbox/network/BaseTest.java index 9f250e11..ea4b8e27 100644 --- a/test/dorkbox/network/BaseTest.java +++ b/test/dorkbox/network/BaseTest.java @@ -20,7 +20,7 @@ package dorkbox.network; -import static dorkbox.network.connection.EndPoint.THREADGROUP_NAME; +import static dorkbox.network.connection.EndPointBase.THREADGROUP_NAME; import static org.junit.Assert.fail; import java.util.ArrayList; @@ -34,7 +34,7 @@ import ch.qos.logback.classic.encoder.PatternLayoutEncoder; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.ConsoleAppender; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.util.entropy.Entropy; import dorkbox.util.entropy.SimpleEntropy; import dorkbox.util.exceptions.InitializationException; @@ -57,7 +57,7 @@ class BaseTest { } volatile boolean fail_check; - private final ArrayList endPoints = new ArrayList(); + private final ArrayList endPointBaseConnections = new ArrayList(); public BaseTest() { @@ -107,8 +107,8 @@ class BaseTest { } public - void addEndPoint(final EndPoint endPoint) { - this.endPoints.add(endPoint); + void addEndPoint(final EndPointBase endPointBaseConnection) { + this.endPointBaseConnections.add(endPointBaseConnection); } /** @@ -157,12 +157,12 @@ class BaseTest { private void stopEndPoints_outsideThread() { - synchronized (BaseTest.this.endPoints) { - for (EndPoint endPoint : BaseTest.this.endPoints) { - endPoint.stop(); - endPoint.waitForShutdown(); + synchronized (BaseTest.this.endPointBaseConnections) { + for (EndPointBase endPointBaseConnection : BaseTest.this.endPointBaseConnections) { + endPointBaseConnection.stop(); + endPointBaseConnection.waitForShutdown(); } - BaseTest.this.endPoints.clear(); + BaseTest.this.endPointBaseConnections.clear(); } } @@ -226,8 +226,8 @@ class BaseTest { } while (true) { - synchronized (this.endPoints) { - if (this.endPoints.isEmpty()) { + synchronized (this.endPointBaseConnections) { + if (this.endPointBaseConnections.isEmpty()) { break; } } diff --git a/test/dorkbox/network/ConnectionTest.java b/test/dorkbox/network/ConnectionTest.java index 0855cae7..19fbe822 100644 --- a/test/dorkbox/network/ConnectionTest.java +++ b/test/dorkbox/network/ConnectionTest.java @@ -27,7 +27,7 @@ import org.junit.Test; import dorkbox.network.connection.Connection; import dorkbox.network.connection.CryptoSerializationManager; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.Listener; import dorkbox.util.SerializationManager; import dorkbox.util.exceptions.InitializationException; @@ -42,7 +42,7 @@ class ConnectionTest extends BaseTest { System.out.println("---- " + "Local"); Configuration configuration = new Configuration(); - configuration.localChannelName = EndPoint.LOCAL_CHANNEL; + configuration.localChannelName = EndPointBase.LOCAL_CHANNEL; configuration.serialization = CryptoSerializationManager.DEFAULT(); register(configuration.serialization); diff --git a/test/dorkbox/network/ListenerTest.java b/test/dorkbox/network/ListenerTest.java index 4a5934e6..accf335f 100644 --- a/test/dorkbox/network/ListenerTest.java +++ b/test/dorkbox/network/ListenerTest.java @@ -33,7 +33,7 @@ import org.slf4j.Logger; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.Listener; import dorkbox.network.connection.Listeners; import dorkbox.network.rmi.RmiBridge; @@ -64,8 +64,8 @@ class ListenerTest extends BaseTest { // quick and dirty test to also test connection sub-classing class TestConnectionA extends ConnectionImpl { public - TestConnectionA(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { - super(logger, endPoint, rmiBridge); + TestConnectionA(final Logger logger, final EndPointBase endPointBaseConnection, final RmiBridge rmiBridge) { + super(logger, endPointBaseConnection, rmiBridge); } public @@ -77,8 +77,8 @@ class ListenerTest extends BaseTest { class TestConnectionB extends TestConnectionA { public - TestConnectionB(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { - super(logger, endPoint, rmiBridge); + TestConnectionB(final Logger logger, final EndPointBase endPointBaseConnection, final RmiBridge rmiBridge) { + super(logger, endPointBaseConnection, rmiBridge); } @Override @@ -109,7 +109,7 @@ class ListenerTest extends BaseTest { Server server = new Server(configuration) { @Override public - TestConnectionA newConnection(final Logger logger, final EndPoint endPoint, final RmiBridge rmiBridge) { + TestConnectionA newConnection(final Logger logger, final EndPointBase endPoint, final RmiBridge rmiBridge) { return new TestConnectionA(logger, endPoint, rmiBridge); } }; diff --git a/test/dorkbox/network/PingPongTest.java b/test/dorkbox/network/PingPongTest.java index c6568d49..55bf572e 100644 --- a/test/dorkbox/network/PingPongTest.java +++ b/test/dorkbox/network/PingPongTest.java @@ -30,7 +30,7 @@ import org.junit.Test; import dorkbox.network.connection.Connection; import dorkbox.network.connection.CryptoSerializationManager; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.Listener; import dorkbox.network.connection.Listeners; import dorkbox.util.SerializationManager; @@ -52,8 +52,8 @@ class PingPongTest extends BaseTest { public void pingPong() throws InitializationException, SecurityException, IOException, InterruptedException { // UDP data is kinda big. Make sure it fits into one packet. - int origSize = EndPoint.udpMaxSize; - EndPoint.udpMaxSize = 2048; + int origSize = EndPointBase.udpMaxSize; + EndPointBase.udpMaxSize = 2048; this.fail = "Data not received."; @@ -192,7 +192,7 @@ class PingPongTest extends BaseTest { fail(this.fail); } - EndPoint.udpMaxSize = origSize; + EndPointBase.udpMaxSize = origSize; } private static diff --git a/test/dorkbox/network/UnregisteredClassTest.java b/test/dorkbox/network/UnregisteredClassTest.java index 3772de44..e7ca4251 100644 --- a/test/dorkbox/network/UnregisteredClassTest.java +++ b/test/dorkbox/network/UnregisteredClassTest.java @@ -30,7 +30,7 @@ import org.junit.Test; import dorkbox.network.connection.Connection; import dorkbox.network.connection.CryptoSerializationManager; -import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.EndPointBase; import dorkbox.network.connection.Listener; import dorkbox.network.connection.Listeners; import dorkbox.util.exceptions.InitializationException; @@ -47,8 +47,8 @@ class UnregisteredClassTest extends BaseTest { @Test public void unregisteredClasses() throws InitializationException, SecurityException, IOException, InterruptedException { - int origSize = EndPoint.udpMaxSize; - EndPoint.udpMaxSize = 2048; + int origSize = EndPointBase.udpMaxSize; + EndPointBase.udpMaxSize = 2048; Configuration configuration = new Configuration(); configuration.tcpPort = tcpPort; @@ -179,7 +179,7 @@ class UnregisteredClassTest extends BaseTest { fail(this.fail); } - EndPoint.udpMaxSize = origSize; + EndPointBase.udpMaxSize = origSize; } private