Added notes, code polish, connect now times-out instead of waiting

forever
This commit is contained in:
nathan 2018-01-12 17:18:09 +01:00
parent 331ca0e36c
commit d39930a700

View File

@ -57,7 +57,7 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
protected protected
void registerNextProtocol() { void registerNextProtocol() {
// always reset everything. // always reset everything.
this.registrationComplete = false; registrationComplete = false;
bootstrapIterator = bootstraps.iterator(); bootstrapIterator = bootstraps.iterator();
startProtocolRegistration(); startProtocolRegistration();
@ -79,7 +79,8 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
@Override @Override
public public
void run() { void run() {
synchronized (this.bootstrapLock) { // NOTE: Throwing exceptions in this method is pointless, since it runs from it's own thread
synchronized (bootstrapLock) {
if (isRegistrationComplete()) { if (isRegistrationComplete()) {
return; return;
} }
@ -88,9 +89,9 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
ChannelFuture future; ChannelFuture future;
if (this.connectionTimeout != 0) { if (connectionTimeout != 0) {
// must be before connect // must be before connect
bootstrapWrapper.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout); bootstrapWrapper.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeout);
} }
try { try {
@ -98,25 +99,39 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
// If the reply isn't from the correct port, then the other end will receive a "Port Unreachable" exception. // If the reply isn't from the correct port, then the other end will receive a "Port Unreachable" exception.
future = bootstrapWrapper.bootstrap.connect(); future = bootstrapWrapper.bootstrap.connect();
future.await(); future.await(connectionTimeout);
} catch (Exception e) { } catch (Exception e) {
String errorMessage = stopWithErrorMessage(this.logger, String errorMessage = "Could not connect to the " + bootstrapWrapper.type + " server at " + bootstrapWrapper.address + " on port: " + bootstrapWrapper.port;
"Could not connect to the " + bootstrapWrapper.type + " server at " + if (logger.isDebugEnabled()) {
bootstrapWrapper.address + " on port: " + bootstrapWrapper.port, // extra info if debug is enabled
e); logger.error(errorMessage, e);
throw new IllegalArgumentException(errorMessage); }
else {
logger.error(errorMessage);
}
return;
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
String errorMessage = stopWithErrorMessage(this.logger, Throwable cause = future.cause();
"Could not connect to the " + bootstrapWrapper.type + " server at " + if (cause instanceof java.net.ConnectException) {
bootstrapWrapper.address + " on port: " + bootstrapWrapper.port, if (cause.getMessage()
future.cause()); .contains("refused")) {
throw new IllegalArgumentException(errorMessage); String errorMessage = "Connection refused to the " + bootstrapWrapper.type + " server at " + bootstrapWrapper.address + " on port: " + bootstrapWrapper.port;
logger.error(errorMessage, cause);
}
} else {
String errorMessage = "Connection failed to the " + bootstrapWrapper.type + " server at " + bootstrapWrapper.address + " on port: " + bootstrapWrapper.port;
logger.error(errorMessage, cause);
}
return;
} }
if (this.logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
this.logger.trace("Waiting for registration from server."); logger.trace("Waiting for registration from server.");
} }
manageForShutdown(future); manageForShutdown(future);
} }
@ -130,9 +145,9 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
@Override @Override
protected protected
boolean registerNextProtocol0() { boolean registerNextProtocol0() {
synchronized (this.bootstrapLock) { synchronized (bootstrapLock) {
this.registrationComplete = isRegistrationComplete(); registrationComplete = isRegistrationComplete();
if (!this.registrationComplete) { if (!registrationComplete) {
startProtocolRegistration(); startProtocolRegistration();
} }
@ -141,13 +156,13 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
} }
if (this.logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
this.logger.trace("Registered protocol from server."); logger.trace("Registered protocol from server.");
} }
// only let us continue with connections (this starts up the client/server implementations) once ALL of the // only let us continue with connections (this starts up the client/server implementations) once ALL of the
// bootstraps have connected // bootstraps have connected
return this.registrationComplete; return registrationComplete;
} }
/** /**
@ -160,7 +175,7 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
// invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need. // invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need.
super.connectionConnected0(connection); super.connectionConnected0(connection);
this.connectionBridgeFlushAlways = new ConnectionBridge() { connectionBridgeFlushAlways = new ConnectionBridge() {
@Override @Override
public public
void self(Object message) { void self(Object message) {
@ -206,8 +221,8 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
rmiInitializationComplete = connection.rmiCallbacksIsEmpty(); rmiInitializationComplete = connection.rmiCallbacksIsEmpty();
// notify the registration we are done! // notify the registration we are done!
synchronized (this.registrationLock) { synchronized (registrationLock) {
this.registrationLock.notify(); registrationLock.notify();
} }
} }
@ -235,7 +250,7 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
@Override @Override
public public
ConnectionBridge send() { ConnectionBridge send() {
return this.connectionBridgeFlushAlways; return connectionBridgeFlushAlways;
} }
/** /**
@ -253,8 +268,8 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
// make sure we're not waiting on registration // make sure we're not waiting on registration
registrationComplete = true; registrationComplete = true;
synchronized (this.registrationLock) { synchronized (registrationLock) {
this.registrationLock.notify(); registrationLock.notify();
} }
registrationComplete = false; registrationComplete = false;
@ -268,8 +283,8 @@ class EndPointClient<C extends Connection> extends EndPointBase<C> implements Ru
* Internal call to abort registration if the shutdown command is issued during channel registration. * Internal call to abort registration if the shutdown command is issued during channel registration.
*/ */
void abortRegistration() { void abortRegistration() {
synchronized (this.registrationLock) { synchronized (registrationLock) {
this.registrationLock.notify(); registrationLock.notify();
} }
// Always unblock the waiting client.connect(). // Always unblock the waiting client.connect().