Fixed issues with client reconnecting (it was flaky)

This commit is contained in:
nathan 2016-03-17 02:57:42 +01:00
parent fe8e21c449
commit 05291d61a3
3 changed files with 57 additions and 42 deletions

View File

@ -467,19 +467,6 @@ class Client<C extends Connection> extends EndPointClient<C> implements Connecti
return this.connection;
}
/**
* Closes all connections ONLY (keeps the client running). To STOP the client, use stop().
* <p/>
* This is used, for example, when reconnecting to a server.
*/
@Override
public
void closeConnections() {
synchronized (this.registrationLock) {
this.registrationLock.notify();
}
}
/**
* Closes all connections ONLY (keeps the client running). To STOP the client, use stop().
* <p/>

View File

@ -542,9 +542,43 @@ class EndPoint<C extends Connection> {
// stop does the same as this + more
this.connectionManager.closeConnections();
// Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed.
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
metaChannel.close(maxShutdownWaitTimeInMilliSeconds);
Thread.yield();
}
channelMap.clear();
} finally {
registrationWrapper2.releaseChannelMap();
}
this.isConnected.set(false);
}
// server only does this on stop. Client does this on closeConnections
protected void shutdownChannels() {
synchronized (shutdownChannelList) {
// now we stop all of our channels
for (ChannelFuture f : this.shutdownChannelList) {
Channel channel = f.channel();
channel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
Thread.yield();
}
// we have to clear the shutdown list. (
this.shutdownChannelList.clear();
}
}
protected final
String stopWithErrorMessage(Logger logger2, String errorMessage, Throwable throwable) {
if (logger2.isDebugEnabled() && throwable != null) {
@ -643,41 +677,14 @@ class EndPoint<C extends Connection> {
}
}
// this does a closeConnections + clear_listeners
this.connectionManager.stop();
// Sometimes there might be "lingering" connections (ie, halfway though registration) that need to be closed.
long maxShutdownWaitTimeInMilliSeconds = EndPoint.maxShutdownWaitTimeInMilliSeconds;
RegistrationWrapper<C> registrationWrapper2 = this.registrationWrapper;
try {
IntMap<MetaChannel> channelMap = registrationWrapper2.getAndLockChannelMap();
Entries<MetaChannel> entries = channelMap.entries();
while (entries.hasNext()) {
MetaChannel metaChannel = entries.next().value;
metaChannel.close(maxShutdownWaitTimeInMilliSeconds);
Thread.yield();
}
channelMap.clear();
} finally {
registrationWrapper2.releaseChannelMap();
}
shutdownChannels();
// shutdown the database store
this.propertyStore.close();
// now we stop all of our channels
for (ChannelFuture f : this.shutdownChannelList) {
Channel channel = f.channel();
channel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
Thread.yield();
}
// we have to clear the shutdown list.
this.shutdownChannelList.clear();
// we want to WAIT until after the event executors have completed shutting down.
List<Future<?>> shutdownThreadList = new LinkedList<Future<?>>();

View File

@ -205,6 +205,27 @@ class EndPointClient<C extends Connection> extends EndPoint<C> implements Runnab
return this.connectionBridgeFlushAlways;
}
/**
* Closes all connections ONLY (keeps the client running). To STOP the client, use stop().
* <p/>
* This is used, for example, when reconnecting to a server.
*/
@Override
public
void closeConnections() {
super.closeConnections();
// for the CLIENT only, we clear these connections! (the server only clears them on shutdown)
shutdownChannels();
// make sure we're not waiting on registration
registrationComplete = true;
synchronized (this.registrationLock) {
this.registrationLock.notify();
}
registrationComplete = false;
}
/**
* Internal call to abort registration if the shutdown command is issued during channel registration.
*/