Fixed issues when closing a channel that was already closed.

This commit is contained in:
nathan 2017-07-30 11:28:33 +02:00
parent 8f5558c95c
commit f091b7d604
3 changed files with 51 additions and 34 deletions

View File

@ -15,7 +15,15 @@
*/ */
package dorkbox.network.connection; package dorkbox.network.connection;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.util.IdentityMap; import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.bridge.ConnectionBridgeServer; import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer;
import dorkbox.network.connection.listenerManagement.OnConnectedManager; import dorkbox.network.connection.listenerManagement.OnConnectedManager;
@ -25,12 +33,6 @@ import dorkbox.network.connection.listenerManagement.OnMessageReceivedManager;
import dorkbox.util.ClassHelper; import dorkbox.util.ClassHelper;
import dorkbox.util.Property; import dorkbox.util.Property;
import dorkbox.util.collections.ConcurrentEntry; import dorkbox.util.collections.ConcurrentEntry;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
// .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other. // .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other.
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -650,6 +652,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
* *
* @see dorkbox.network.connection.ConnectionPoint#flush() * @see dorkbox.network.connection.ConnectionPoint#flush()
*/ */
@Override
public public
void flush() { void flush() {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);
@ -667,6 +670,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
* Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local * Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local
* channel). * channel).
*/ */
@Override
public public
ConnectionPoint TCP(final C connection, final Object message) { ConnectionPoint TCP(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);
@ -687,6 +691,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
* Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local * Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local
* channel). * channel).
*/ */
@Override
public public
ConnectionPoint UDP(final C connection, final Object message) { ConnectionPoint UDP(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);
@ -707,6 +712,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
* Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local * Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local
* channel). * channel).
*/ */
@Override
public public
ConnectionPoint UDT(final C connection, final Object message) { ConnectionPoint UDT(final C connection, final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);
@ -726,6 +732,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
/** /**
* Sends the message to other listeners INSIDE this endpoint for EVERY connection. It does not send it to a remote address. * Sends the message to other listeners INSIDE this endpoint for EVERY connection. It does not send it to a remote address.
*/ */
@Override
public public
void self(final Object message) { void self(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);
@ -741,6 +748,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
/** /**
* Sends the object all server connections over the network using TCP. (or via LOCAL when it's a local channel). * Sends the object all server connections over the network using TCP. (or via LOCAL when it's a local channel).
*/ */
@Override
public public
ConnectionPoint TCP(final Object message) { ConnectionPoint TCP(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);
@ -758,6 +766,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
/** /**
* Sends the object all server connections over the network using UDP. (or via LOCAL when it's a local channel). * Sends the object all server connections over the network using UDP. (or via LOCAL when it's a local channel).
*/ */
@Override
public public
ConnectionPoint UDP(final Object message) { ConnectionPoint UDP(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);
@ -776,6 +785,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
/** /**
* Sends the object all server connections over the network using UDT. (or via LOCAL when it's a local channel). * Sends the object all server connections over the network using UDT. (or via LOCAL when it's a local channel).
*/ */
@Override
public public
ConnectionPoint UDT(final Object message) { ConnectionPoint UDT(final Object message) {
ConcurrentEntry<C> current = connectionsREF.get(this); ConcurrentEntry<C> current = connectionsREF.get(this);

View File

@ -15,6 +15,24 @@
*/ */
package dorkbox.network.connection; package dorkbox.network.connection;
import java.io.IOException;
import java.security.AccessControlException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.slf4j.Logger;
import dorkbox.network.Configuration; import dorkbox.network.Configuration;
import dorkbox.network.connection.bridge.ConnectionBridgeBase; import dorkbox.network.connection.bridge.ConnectionBridgeBase;
import dorkbox.network.connection.ping.PingSystemListener; import dorkbox.network.connection.ping.PingSystemListener;
@ -41,22 +59,6 @@ import io.netty.util.NetUtil;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import org.slf4j.Logger;
import java.io.IOException;
import java.security.AccessControlException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* represents the base of a client/server end point * represents the base of a client/server end point
@ -566,9 +568,11 @@ class EndPoint<C extends Connection> {
// now we stop all of our channels // now we stop all of our channels
for (ChannelFuture f : this.shutdownChannelList) { for (ChannelFuture f : this.shutdownChannelList) {
Channel channel = f.channel(); Channel channel = f.channel();
channel.close() if (channel.isOpen()) {
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); channel.close()
Thread.yield(); .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
Thread.yield();
}
} }
// we have to clear the shutdown list. ( // we have to clear the shutdown list. (
@ -622,8 +626,10 @@ class EndPoint<C extends Connection> {
// a client/server thread executor, it will deadlock while waiting for the threadpool to terminate. // a client/server thread executor, it will deadlock while waiting for the threadpool to terminate.
boolean isInEventLoop = false; boolean isInEventLoop = false;
for (EventLoopGroup loopGroup : this.eventLoopGroups) { for (EventLoopGroup loopGroup : this.eventLoopGroups) {
for (EventExecutor child : loopGroup.children()) { Iterator<EventExecutor> iterator = loopGroup.iterator();
if (child.inEventLoop()) { while (iterator.hasNext()) {
EventExecutor next = iterator.next();
if (next.inEventLoop()) {
isInEventLoop = true; isInEventLoop = true;
break; break;
} }

View File

@ -15,12 +15,13 @@
*/ */
package dorkbox.network.connection.registration; package dorkbox.network.connection.registration;
import dorkbox.network.connection.ConnectionImpl; import java.net.InetSocketAddress;
import io.netty.channel.Channel;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair; import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPublicKeyParameters; import org.bouncycastle.crypto.params.ECPublicKeyParameters;
import java.net.InetSocketAddress; import dorkbox.network.connection.ConnectionImpl;
import io.netty.channel.Channel;
// @formatter:off // @formatter:off
public public
@ -80,22 +81,22 @@ class MetaChannel {
public public
void close(final long maxShutdownWaitTimeInMilliSeconds) { void close(final long maxShutdownWaitTimeInMilliSeconds) {
if (this.localChannel != null) { if (this.localChannel != null && this.localChannel.isOpen()) {
this.localChannel.close(); this.localChannel.close();
} }
if (this.tcpChannel != null) { if (this.tcpChannel != null && this.tcpChannel.isOpen()) {
this.tcpChannel.close() this.tcpChannel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
} }
if (this.udtChannel != null) { if (this.udtChannel != null && this.udtChannel.isOpen()) {
this.udtChannel.close() this.udtChannel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
} }
// only the CLIENT will have this. // only the CLIENT will have this.
if (this.udpChannel != null && this.udpRemoteAddress == null) { if (this.udpChannel != null && this.udpRemoteAddress == null && this.udpChannel.isOpen()) {
this.udpChannel.close() this.udpChannel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
} }