Cleanup. pre-kotlin conversion

This commit is contained in:
nathan 2020-06-05 19:50:16 +02:00
parent 794965348a
commit 4c8c50e8a3
17 changed files with 69 additions and 291 deletions

View File

@ -132,37 +132,63 @@ class Server<C extends Connection> extends EndPointServer {
isRunning = true;
try (final Publication publication = this.setupAllClientsPublication()) {
try (final Subscription subscription = this.setupAllClientsSubscription()) {
Publication publication = null;
Subscription subscription = null;
FragmentHandler handler = null;
try {
publication = EchoChannels.createPublicationDynamicMDC(this.aeron,
this.config.listenIpAddress,
this.config.controlPort,
UDP_STREAM_ID);
/**
* Note: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.
*
* Note: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy.
*/
final FragmentHandler handler = new FragmentAssembler((buffer, offset, length, header)->this.onInitialClientMessage(
publication,
buffer,
offset,
length,
header));
subscription = EchoChannels.createSubscriptionWithHandlers(this.aeron,
this.config.listenIpAddress,
this.config.port,
UDP_STREAM_ID,
this::onInitialClientConnected,
this::onInitialClientDisconnected);
while (true) {
this.executor.execute(()->{
subscription.poll(handler, 100); // this checks to see if there are NEW clients
this.clients.poll(); // this manages existing clients
});
try {
Thread.sleep(100L);
} catch (final InterruptedException e) {
Thread.currentThread()
.interrupt();
}
/**
* Note: Reassembly has been shown to be minimal impact to latency. But not totally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.
*
* Note: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy.
*/
final Publication finalPublication = publication;
handler = new FragmentAssembler((buffer, offset, length, header)->this.onInitialClientMessage(finalPublication,
buffer,
offset,
length,
header));
final FragmentHandler initialConnectionHandler = handler;
final Subscription initialConnectionSubscription = subscription;
while (true) {
this.executor.execute(()->{
initialConnectionSubscription.poll(initialConnectionHandler, 100); // this checks to see if there are NEW clients
this.clients.poll(); // this manages existing clients
});
try {
Thread.sleep(100L);
} catch (final InterruptedException e) {
Thread.currentThread()
.interrupt();
}
}
} finally {
if (publication != null) {
publication.close();
}
if (subscription != null) {
subscription.close();
}
}
// we now BLOCK until the stop method is called.
// if we want to continue running code in the server, bind should be called in a separate, non-daemon thread.
// if (blockUntilTerminate) {
@ -265,32 +291,6 @@ class Server<C extends Connection> extends EndPointServer {
});
}
/**
* Configure the publication for the "all-clients" channel.
*/
private
Publication setupAllClientsPublication() {
return EchoChannels.createPublicationDynamicMDC(this.aeron,
this.config.listenIpAddress,
this.config.controlPort,
UDP_STREAM_ID);
}
/**
* Configure the subscription for the "all-clients" channel.
*/
private
Subscription setupAllClientsSubscription() {
return EchoChannels.createSubscriptionWithHandlers(this.aeron,
this.config.listenIpAddress,
this.config.port,
UDP_STREAM_ID,
this::onInitialClientConnected,
this::onInitialClientDisconnected);
}
private
void onInitialClientConnected(final Image image) {
this.executor.execute(()->{

View File

@ -156,7 +156,6 @@ class EchoServerDuologue implements AutoCloseable {
/**
* Poll the duologue for activity.
*/
public
void poll() {
this.exec.assertIsExecutorThread();

View File

@ -45,8 +45,6 @@ import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Promise;
@ -315,10 +313,10 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
public final
void ping0(PingMessage ping) {
if (this.channelWrapper.udp() != null) {
UDP(ping).flush();
UDP(ping);
}
else if (this.channelWrapper.tcp() != null) {
TCP(ping).flush();
TCP(ping);
}
else {
self(ping);
@ -484,36 +482,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
}
}
/**
* Flushes the contents of the TCP/UDP/etc pipes to the actual transport.
*/
final
void flush() {
this.channelWrapper.flush();
}
/**
* Invoked when a {@link Channel} has been idle for a while.
*/
@Override
public
void userEventTriggered(ChannelHandlerContext context, Object event) throws Exception {
// if (e.getState() == IdleState.READER_IDLE) {
// e.getChannel().close();
// } else if (e.getState() == IdleState.WRITER_IDLE) {
// e.getChannel().write(new Object());
// } else
if (event instanceof IdleStateEvent) {
if (((IdleStateEvent) event).state() == IdleState.ALL_IDLE) {
// will auto-flush if necessary
this.sessionManager.onIdle(this);
}
}
super.userEventTriggered(context, event);
}
/**
* @param context can be NULL when running deferred messages from registration process.
* @param message the received message
@ -698,8 +666,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
}
}
// flush any pending messages
this.channelWrapper.flush();
// close out the ping future
PingFuture pingFuture2 = this.pingFuture;

View File

@ -29,7 +29,6 @@ import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer;
import dorkbox.network.connection.listenerManagement.OnConnectedManager;
import dorkbox.network.connection.listenerManagement.OnDisconnectedManager;
import dorkbox.network.connection.listenerManagement.OnIdleManager;
import dorkbox.network.connection.listenerManagement.OnMessageReceivedManager;
import dorkbox.network.connection.ping.PingMessage;
import dorkbox.util.Property;
@ -69,7 +68,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
private final OnConnectedManager<C> onConnectedManager;
private final OnDisconnectedManager<C> onDisconnectedManager;
private final OnIdleManager<C> onIdleManager;
private final OnMessageReceivedManager<C> onMessageReceivedManager;
@SuppressWarnings({"FieldCanBeLocal", "unused"})
@ -106,7 +104,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
onConnectedManager = new OnConnectedManager<C>(logger);
onDisconnectedManager = new OnDisconnectedManager<C>(logger);
onIdleManager = new OnIdleManager<C>(logger);
onMessageReceivedManager = new OnMessageReceivedManager<C>(logger);
}
@ -161,10 +158,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
onDisconnectedManager.add((Listener.OnDisconnected) listener);
found = true;
}
if (listener instanceof Listener.OnIdle) {
onIdleManager.add((Listener.OnIdle) listener);
found = true;
}
if (listener instanceof Listener.OnMessageReceived) {
onMessageReceivedManager.add((Listener.OnMessageReceived) listener);
@ -226,13 +219,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
found |= true;
}
}
if (listener instanceof Listener.OnIdle) {
int size = onIdleManager.removeWithSize((Listener.OnIdle) listener);
if (size >= 0) {
remainingListeners += size;
found |= true;
}
}
if (listener instanceof Listener.OnMessageReceived) {
int size = onMessageReceivedManager.removeWithSize((Listener.OnMessageReceived) listener);
if (size >= 0) {
@ -265,7 +251,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
Listeners removeAll() {
onConnectedManager.clear();
onDisconnectedManager.clear();
onIdleManager.clear();
onMessageReceivedManager.clear();
logger.trace("ALL listeners removed !!");
@ -305,8 +290,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
/**
* Invoked when a message object was received from a remote peer.
* <p/>
* If data is sent in response to this event, the connection data is automatically flushed to the wire. If the data is sent in a separate thread,
* {@link EndPoint#send().flush()} must be called manually.
* If data is sent in response to this event, the connection data is automatically flushed to the wire.
* <p/>
* {@link ISessionManager}
*/
@ -344,9 +328,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
if (connection.manageRmi(message)) {
// if we are an RMI message/registration, we have very specific, defined behavior. We do not use the "normal" listener callback pattern
// because these methods are rare, and require special functionality
// make sure we flush the message to the socket!
connection.flush();
return true;
}
@ -367,10 +348,7 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
}
// only run a flush once
if (foundListener) {
connection.flush();
}
else {
if (!foundListener) {
this.logger.warn("----------- LISTENER NOT REGISTERED FOR TYPE: {}",
message.getClass()
.getSimpleName());
@ -378,28 +356,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
return foundListener;
}
/**
* Invoked when a Connection has been idle for a while.
*/
@Override
public final
void onIdle(final ConnectionImpl connection) {
boolean foundListener = onIdleManager.notifyIdle((C) connection, shutdown);
if (foundListener) {
connection.flush();
}
// now have to account for additional (local) listener managers.
// access a snapshot of the managers (single-writer-principle)
final IdentityMap<Connection, ConnectionManager> localManagers = localManagersREF.get(this);
ConnectionManager localManager = localManagers.get(connection);
if (localManager != null) {
localManager.onIdle(connection);
}
}
/**
* Invoked when a Channel is open, bound to a local address, and connected to a remote address.
*/
@ -410,10 +366,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
boolean foundListener = onConnectedManager.notifyConnected((C) connection, shutdown);
if (foundListener) {
connection.flush();
}
// now have to account for additional (local) listener managers.
// access a snapshot of the managers (single-writer-principle)
final IdentityMap<Connection, ConnectionManager> localManagers = localManagersREF.get(this);
@ -433,10 +385,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
boolean foundListener = onDisconnectedManager.notifyDisconnected((C) connection);
if (foundListener) {
connection.flush();
}
// now have to account for additional (local) listener managers.
// access a snapshot of the managers (single-writer-principle)
@ -633,7 +581,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
onConnectedManager.clear();
onDisconnectedManager.clear();
onIdleManager.clear();
onMessageReceivedManager.clear();
}
@ -822,22 +769,6 @@ class ConnectionManager<C extends Connection> implements Listeners, ISessionMana
return this;
}
/**
* Flushes the contents of the TCP/UDP/etc pipes to the actual transport socket.
*/
@Override
public
void flush() {
ConcurrentEntry<ConnectionImpl> current = connectionsREF.get(this);
ConnectionImpl c;
while (current != null) {
c = current.getValue();
current = current.next();
c.flush();
}
}
@Override
public
boolean equals(final Object o) {

View File

@ -30,11 +30,6 @@ interface ConnectionPoint {
*/
void write(Object object) throws Exception;
/**
* Flushes the contents of the TCP/UDP/etc pipes to the actual transport socket.
*/
void flush();
/**
* Creates a new promise associated with this connection type
*/

View File

@ -408,7 +408,7 @@ After this command is executed the new disk will be mounted under /Volumes/DevSh
}
/**
* This method allows the connections used by the client/server to be subclassed (custom implementations).
* This method allows the connections used by the client/server to be subclassed (with custom implementations).
* <p/>
* As this is for the network stack, the new connection MUST subclass {@link ConnectionImpl}
* <p/>
@ -487,17 +487,6 @@ After this command is executed the new disk will be mounted under /Volumes/DevSh
return connectionManager.getConnections();
}
/**
* Closes all connections ONLY (keeps the server/client running). To STOP the client/server, use stop().
* <p/>
* This is used, for example, when reconnecting to a server.
* <p/>
* The server should ALWAYS use STOP.
*/
void closeConnections(boolean shouldKeepListeners) {
}
/**
* Creates a "global" RMI object for use by multiple connections.
*

View File

@ -65,7 +65,6 @@ class EndPointClient extends EndPoint<ClientConfiguration> {
public
ConnectionPoint self(Object message) {
ConnectionPoint self = connection.self(message);
connection.flush();
return self;
}
@ -74,7 +73,6 @@ class EndPointClient extends EndPoint<ClientConfiguration> {
public
ConnectionPoint TCP(Object message) {
ConnectionPoint tcp = connection.TCP(message);
connection.flush();
// needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from
// INSIDE the event loop
@ -87,7 +85,6 @@ class EndPointClient extends EndPoint<ClientConfiguration> {
public
ConnectionPoint UDP(Object message) {
ConnectionPoint udp = connection.UDP(message);
connection.flush();
// needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from
// INSIDE the event loop
@ -99,7 +96,6 @@ class EndPointClient extends EndPoint<ClientConfiguration> {
public
Ping ping() {
Ping ping = connection.ping();
connection.flush();
return ping;
}
};
@ -131,7 +127,6 @@ class EndPointClient extends EndPoint<ClientConfiguration> {
public
ConnectionPoint send(final Object message) {
ConnectionPoint send = connection.send(message);
send.flush();
// needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from
// INSIDE the event loop

View File

@ -26,13 +26,6 @@ interface ISessionManager {
*/
void onMessage(ConnectionImpl connection, Object message);
/**
* Called when the connection has been idle (read & write) for 2 seconds.
* <p>
* Will auto-flush the connection queue if necessary.
*/
void onIdle(ConnectionImpl connection);
/**
* Invoked when a Channel is open, bound to a local address, and connected to a remote address.
*/

View File

@ -23,16 +23,13 @@ import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener.OnConnected;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* Called when the remote computer has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
public final
class OnConnectedManager<C extends Connection> extends ConcurrentManager<C, OnConnected<C>> {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
public
OnConnectedManager(final Logger logger) {
super(logger);

View File

@ -1,50 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.listenerManagement;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener.OnIdle;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
public final
class OnIdleManager<C extends Connection> extends ConcurrentManager<C, OnIdle<C>> {
public
OnIdleManager(final Logger logger) {
super(logger);
}
/**
* @return true if a listener was found, false otherwise
*/
public
boolean notifyIdle(final C connection, final AtomicBoolean shutdown) {
return doAction(connection, shutdown);
}
@Override
void listenerAction(final C connection, final OnIdle<C> listener) throws Exception {
listener.idle(connection);
}
}

View File

@ -76,18 +76,6 @@ class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint {
return this;
}
/**
* Flushes the contents of the LOCAL pipes to the actual transport.
*/
@Override
public
void flush() {
if (this.shouldFlush.compareAndSet(true, false)) {
this.channel.flush();
}
}
@Override
public
<V> Promise<V> newPromise() {

View File

@ -56,14 +56,6 @@ class ChannelNetwork implements ConnectionPoint {
return channel.isWritable();
}
@Override
public
void flush() {
if (shouldFlush.compareAndSet(true, false)) {
channel.flush();
}
}
@Override
public
<V> Promise<V> newPromise() {

View File

@ -85,21 +85,6 @@ class ChannelNetworkWrapper implements ChannelWrapper {
return this.udp;
}
/**
* Flushes the contents of the TCP/UDP/etc pipes to the actual transport.
*/
@Override
public
void flush() {
if (this.tcp != null) {
this.tcp.flush();
}
if (this.udp != null) {
this.udp.flush();
}
}
/**
* @return the AES key.
*/

View File

@ -38,11 +38,6 @@ class ChannelNull implements ConnectionPoint {
void write(Object object) {
}
@Override
public
void flush() {
}
/**
* @return true if the channel is writable. Useful when sending large amounts of data at once.
*/

View File

@ -27,11 +27,6 @@ interface ChannelWrapper {
ConnectionPoint tcp();
ConnectionPoint udp();
/**
* Flushes the contents of the TCP/UDP/etc pipes to the actual transport.
*/
void flush();
/**
* @return the AES key.
*/

View File

@ -79,6 +79,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
abstract void registration(final ConnectionImpl connection, final RmiRegistration message);
@Override
public
void close() {
// proxy listeners are cleared in the removeAll() call (which happens BEFORE close)
@ -90,11 +91,13 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
/**
* This will remove the invoke and invoke response listeners for this remote object
*/
@Override
public
void removeAllListeners() {
proxyListeners.clear();
}
@Override
public
<Iface> void createRemoteObject(final ConnectionImpl connection, final Class<Iface> interfaceClass, final RemoteObjectCallback<Iface> callback) {
if (!interfaceClass.isInterface()) {
@ -112,9 +115,10 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
// this means we are creating a NEW object on the server, bound access to only this connection
connection.send(message).flush();
connection.send(message);
}
@Override
public
<Iface> void getRemoteObject(final ConnectionImpl connection, final int objectId, final RemoteObjectCallback<Iface> callback) {
if (objectId < 0) {
@ -137,12 +141,13 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
// have to wait for the object to be created + ID to be assigned on the remote system BEFORE we can create the proxy instance here.
// this means we are getting an EXISTING object on the server, bound access to only this connection
connection.send(message).flush();
connection.send(message);
}
/**
* Manages the RMI stuff for a connection.
*/
@Override
public
boolean manage(final ConnectionImpl connection, final Object message) {
if (message instanceof InvokeMethod) {
@ -167,7 +172,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
InvokeMethodResult result = RmiBridge.invoke(connection, target, invokeMethod, logger);
if (result != null) {
// System.err.println("Sending: " + invokeMethod.responseID);
connection.send(result).flush();
connection.send(result);
}
} catch (IOException e) {
@ -207,6 +212,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
*
* @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID.
*/
@Override
public
<T> int getRegisteredId(final T object) {
// always check global before checking local, because less contention on the synchronization
@ -225,6 +231,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
*
* @param objectId this is the RMI object ID
*/
@Override
public
Object getImplementationObject(final int objectId) {
if (RmiBridge.isGlobal(objectId)) {
@ -363,6 +370,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
* @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID
* @param iFace this is the RMI interface
*/
@Override
public
RemoteObject getProxyObject(final int rmiId, final Class<?> iFace) {
if (iFace == null) {

View File

@ -297,11 +297,11 @@ class RmiProxyHandler implements InvocationHandler {
// Sends our invokeMethod to the remote connection, which the RmiBridge listens for
if (this.udp) {
// flush is necessary in case this is called outside of a network worker thread
this.connection.UDP(invokeMethod).flush();
this.connection.UDP(invokeMethod);
}
else {
// flush is necessary in case this is called outside of a network worker thread
this.connection.send(invokeMethod).flush();
this.connection.send(invokeMethod);
}
if (logger.isTraceEnabled()) {