From f091b7d60427453ce1f6cb576863c6988b76800c Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 30 Jul 2017 11:28:33 +0200 Subject: [PATCH] Fixed issues when closing a channel that was already closed. --- .../network/connection/ConnectionManager.java | 22 ++++++--- src/dorkbox/network/connection/EndPoint.java | 48 +++++++++++-------- .../connection/registration/MetaChannel.java | 15 +++--- 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/src/dorkbox/network/connection/ConnectionManager.java b/src/dorkbox/network/connection/ConnectionManager.java index 8333bab8..48679b2c 100644 --- a/src/dorkbox/network/connection/ConnectionManager.java +++ b/src/dorkbox/network/connection/ConnectionManager.java @@ -15,7 +15,15 @@ */ package dorkbox.network.connection; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.slf4j.Logger; + import com.esotericsoftware.kryo.util.IdentityMap; + import dorkbox.network.connection.bridge.ConnectionBridgeServer; import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; import dorkbox.network.connection.listenerManagement.OnConnectedManager; @@ -25,12 +33,6 @@ import dorkbox.network.connection.listenerManagement.OnMessageReceivedManager; import dorkbox.util.ClassHelper; import dorkbox.util.Property; import dorkbox.util.collections.ConcurrentEntry; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; // .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other. @SuppressWarnings("unchecked") @@ -650,6 +652,7 @@ class ConnectionManager implements ListenerBridge, ISessio * * @see dorkbox.network.connection.ConnectionPoint#flush() */ + @Override public void flush() { ConcurrentEntry current = connectionsREF.get(this); @@ -667,6 +670,7 @@ class ConnectionManager implements ListenerBridge, ISessio * Sends the object to all server connections (except the specified one) over the network using TCP. (or via LOCAL when it's a local * channel). */ + @Override public ConnectionPoint TCP(final C connection, final Object message) { ConcurrentEntry current = connectionsREF.get(this); @@ -687,6 +691,7 @@ class ConnectionManager implements ListenerBridge, ISessio * Sends the object to all server connections (except the specified one) over the network using UDP (or via LOCAL when it's a local * channel). */ + @Override public ConnectionPoint UDP(final C connection, final Object message) { ConcurrentEntry current = connectionsREF.get(this); @@ -707,6 +712,7 @@ class ConnectionManager implements ListenerBridge, ISessio * Sends the object to all server connections (except the specified one) over the network using UDT. (or via LOCAL when it's a local * channel). */ + @Override public ConnectionPoint UDT(final C connection, final Object message) { ConcurrentEntry current = connectionsREF.get(this); @@ -726,6 +732,7 @@ class ConnectionManager implements ListenerBridge, ISessio /** * Sends the message to other listeners INSIDE this endpoint for EVERY connection. It does not send it to a remote address. */ + @Override public void self(final Object message) { ConcurrentEntry current = connectionsREF.get(this); @@ -741,6 +748,7 @@ class ConnectionManager implements ListenerBridge, ISessio /** * Sends the object all server connections over the network using TCP. (or via LOCAL when it's a local channel). */ + @Override public ConnectionPoint TCP(final Object message) { ConcurrentEntry current = connectionsREF.get(this); @@ -758,6 +766,7 @@ class ConnectionManager implements ListenerBridge, ISessio /** * Sends the object all server connections over the network using UDP. (or via LOCAL when it's a local channel). */ + @Override public ConnectionPoint UDP(final Object message) { ConcurrentEntry current = connectionsREF.get(this); @@ -776,6 +785,7 @@ class ConnectionManager implements ListenerBridge, ISessio /** * Sends the object all server connections over the network using UDT. (or via LOCAL when it's a local channel). */ + @Override public ConnectionPoint UDT(final Object message) { ConcurrentEntry current = connectionsREF.get(this); diff --git a/src/dorkbox/network/connection/EndPoint.java b/src/dorkbox/network/connection/EndPoint.java index c98fa319..f821a265 100644 --- a/src/dorkbox/network/connection/EndPoint.java +++ b/src/dorkbox/network/connection/EndPoint.java @@ -15,6 +15,24 @@ */ package dorkbox.network.connection; +import java.io.IOException; +import java.security.AccessControlException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.bouncycastle.crypto.AsymmetricCipherKeyPair; +import org.bouncycastle.crypto.params.ECPrivateKeyParameters; +import org.bouncycastle.crypto.params.ECPublicKeyParameters; +import org.slf4j.Logger; + import dorkbox.network.Configuration; import dorkbox.network.connection.bridge.ConnectionBridgeBase; import dorkbox.network.connection.ping.PingSystemListener; @@ -41,22 +59,6 @@ import io.netty.util.NetUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.internal.PlatformDependent; -import org.bouncycastle.crypto.AsymmetricCipherKeyPair; -import org.bouncycastle.crypto.params.ECPrivateKeyParameters; -import org.bouncycastle.crypto.params.ECPublicKeyParameters; -import org.slf4j.Logger; - -import java.io.IOException; -import java.security.AccessControlException; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * represents the base of a client/server end point @@ -566,9 +568,11 @@ class EndPoint { // now we stop all of our channels for (ChannelFuture f : this.shutdownChannelList) { Channel channel = f.channel(); - channel.close() - .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); - Thread.yield(); + if (channel.isOpen()) { + channel.close() + .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); + Thread.yield(); + } } // we have to clear the shutdown list. ( @@ -622,8 +626,10 @@ class EndPoint { // a client/server thread executor, it will deadlock while waiting for the threadpool to terminate. boolean isInEventLoop = false; for (EventLoopGroup loopGroup : this.eventLoopGroups) { - for (EventExecutor child : loopGroup.children()) { - if (child.inEventLoop()) { + Iterator iterator = loopGroup.iterator(); + while (iterator.hasNext()) { + EventExecutor next = iterator.next(); + if (next.inEventLoop()) { isInEventLoop = true; break; } diff --git a/src/dorkbox/network/connection/registration/MetaChannel.java b/src/dorkbox/network/connection/registration/MetaChannel.java index 28b6ede9..9a57bd37 100644 --- a/src/dorkbox/network/connection/registration/MetaChannel.java +++ b/src/dorkbox/network/connection/registration/MetaChannel.java @@ -15,12 +15,13 @@ */ package dorkbox.network.connection.registration; -import dorkbox.network.connection.ConnectionImpl; -import io.netty.channel.Channel; +import java.net.InetSocketAddress; + import org.bouncycastle.crypto.AsymmetricCipherKeyPair; import org.bouncycastle.crypto.params.ECPublicKeyParameters; -import java.net.InetSocketAddress; +import dorkbox.network.connection.ConnectionImpl; +import io.netty.channel.Channel; // @formatter:off public @@ -80,22 +81,22 @@ class MetaChannel { public void close(final long maxShutdownWaitTimeInMilliSeconds) { - if (this.localChannel != null) { + if (this.localChannel != null && this.localChannel.isOpen()) { this.localChannel.close(); } - if (this.tcpChannel != null) { + if (this.tcpChannel != null && this.tcpChannel.isOpen()) { this.tcpChannel.close() .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); } - if (this.udtChannel != null) { + if (this.udtChannel != null && this.udtChannel.isOpen()) { this.udtChannel.close() .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); } // only the CLIENT will have this. - if (this.udpChannel != null && this.udpRemoteAddress == null) { + if (this.udpChannel != null && this.udpRemoteAddress == null && this.udpChannel.isOpen()) { this.udpChannel.close() .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); }