diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java index be2409a8..58fbc11c 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java @@ -66,8 +66,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, private final org.slf4j.Logger logger; - private final AtomicBoolean writableSignalNeeded = new AtomicBoolean(false); - private final Object writableLock = new Object(); + private final AtomicBoolean needsLock = new AtomicBoolean(false); + private final AtomicBoolean writeSignalNeeded = new AtomicBoolean(false); + private final Object writeLock = new Object(); private final AtomicBoolean closeInProgress = new AtomicBoolean(false); private final AtomicBoolean alreadyClosed = new AtomicBoolean(false); @@ -284,9 +285,10 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, super.channelWritabilityChanged(ctx); // needed to place back-pressure when writing too much data to the connection - if (writableSignalNeeded.getAndSet(false)) { - synchronized (writableLock) { - writableLock.notifyAll(); + if (writeSignalNeeded.getAndSet(false)) { + synchronized (writeLock) { + needsLock.set(false); + writeLock.notifyAll(); } } } @@ -297,15 +299,20 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, * This blocks until we are writable again */ private - void controlBackPressure(ConnectionPointWriter c) { - if (!c.isWritable()) { - writableSignalNeeded.set(true); + void controlBackPressure(ConnectionPoint c) { + while (!c.isWritable()) { + needsLock.set(true); + writeSignalNeeded.set(true); - synchronized (writableLock) { - try { - writableLock.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); + synchronized (writeLock) { + if (needsLock.get()) { + try { + // waits 1 second maximum per check. This is to guarantee that eventually (in the case of deadlocks, which i've seen) + // it will get released. The while loop makes sure it will exit when the channel is writable + writeLock.wait(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } } @@ -336,18 +343,18 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, /** * Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP) */ - @Override - public final - ConnectionPoint TCP(Object message) { + final + ConnectionPoint TCP_backpressure(Object message) { Logger logger2 = this.logger; if (!this.closeInProgress.get()) { if (logger2.isTraceEnabled()) { logger2.trace("Sending TCP {}", message); } ConnectionPointWriter tcp = this.channelWrapper.tcp(); - - // needed to place back-pressure when writing too much data to the connection + // needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from + // INSIDE the event loop controlBackPressure(tcp); + tcp.write(message); return tcp; } @@ -361,7 +368,58 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, } /** - * Sends the object over the network using UDP (or via LOCAL when it's a local channel). + * Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP) + */ + @Override + public final + ConnectionPoint TCP(Object message) { + Logger logger2 = this.logger; + if (!this.closeInProgress.get()) { + if (logger2.isTraceEnabled()) { + logger2.trace("Sending TCP {}", message); + } + ConnectionPointWriter tcp = this.channelWrapper.tcp(); + tcp.write(message); + return tcp; + } + else { + if (logger2.isDebugEnabled()) { + logger2.debug("writing TCP while closed: {}", message); + } + // we have to return something, otherwise dependent code will throw a null pointer exception + return ChannelNull.get(); + } + } + + /** + * Sends the object over the network using UDP (LOCAL channels do not care if its TCP or UDP) + */ + final + ConnectionPoint UDP_backpressure(Object message) { + Logger logger2 = this.logger; + if (!this.closeInProgress.get()) { + if (logger2.isTraceEnabled()) { + logger2.trace("Sending UDP {}", message); + } + ConnectionPointWriter udp = this.channelWrapper.udp(); + // needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from + // INSIDE the event loop + controlBackPressure(udp); + + udp.write(message); + return udp; + } + else { + if (logger2.isDebugEnabled()) { + logger2.debug("writing UDP while closed: {}", message); + } + // we have to return something, otherwise dependent code will throw a null pointer exception + return ChannelNull.get(); + } + } + + /** + * Sends the object over the network using UDP (LOCAL channels do not care if its TCP or UDP) */ @Override public @@ -372,9 +430,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, logger2.trace("Sending UDP {}", message); } ConnectionPointWriter udp = this.channelWrapper.udp(); - - // needed to place back-pressure when writing too much data to the connection - controlBackPressure(udp); udp.write(message); return udp; } @@ -387,6 +442,33 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, } } + /** + * Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP) + */ + final + ConnectionPoint UDT_backpressure(Object message) { + Logger logger2 = this.logger; + if (!this.closeInProgress.get()) { + if (logger2.isTraceEnabled()) { + logger2.trace("Sending UDT {}", message); + } + ConnectionPointWriter udt = this.channelWrapper.udt(); + // needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from + // INSIDE the event loop + controlBackPressure(udt); + + udt.write(message); + return udt; + } + else { + if (logger2.isDebugEnabled()) { + logger2.debug("writing UDT while closed: {}", message); + } + // we have to return something, otherwise dependent code will throw a null pointer exception + return ChannelNull.get(); + } + } + /** * Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP) */ @@ -399,8 +481,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, logger2.trace("Sending UDT {}", message); } ConnectionPointWriter udt = this.channelWrapper.udt(); - // needed to place back-pressure when writing too much data to the connection - controlBackPressure(udt); udt.write(message); return udt; } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java index 0f588146..d4558d39 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/EndPointClient.java @@ -18,7 +18,6 @@ package dorkbox.network.connection; import dorkbox.network.Client; import dorkbox.network.Configuration; import dorkbox.network.connection.bridge.ConnectionBridge; -import dorkbox.network.connection.bridge.ConnectionBridgeFlushAlways; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import io.netty.channel.ChannelFuture; @@ -44,7 +43,7 @@ class EndPointClient extends EndPoint implements Runnab protected volatile int connectionTimeout = 5000; // default protected volatile boolean registrationComplete = false; - private volatile ConnectionBridgeFlushAlways connectionBridgeFlushAlways; + private volatile ConnectionBridge connectionBridgeFlushAlways; public @@ -134,10 +133,58 @@ class EndPointClient extends EndPoint implements Runnab */ @Override final - void connectionConnected0(ConnectionImpl connection) { + void connectionConnected0(final ConnectionImpl connection) { // invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need. super.connectionConnected0(connection); + this.connectionBridgeFlushAlways = new ConnectionBridge() { + @Override + public + void self(Object message) { + connection.self(message); + flush(); + } + + @Override + public + ConnectionPoint TCP(Object message) { + ConnectionPoint tcp = connection.TCP_backpressure(message); + tcp.flush(); + return tcp; + } + + @Override + public + ConnectionPoint UDP(Object message) { + ConnectionPoint udp = connection.UDP_backpressure(message); + udp.flush(); + return udp; + } + + @Override + public + ConnectionPoint UDT(Object message) { + ConnectionPoint udt = connection.UDT_backpressure(message); + udt.flush(); + return udt; + } + + @Override + public + Ping ping() { + Ping ping = connection.ping(); + flush(); + return ping; + } + + @Override + public + void flush() { + connection.flush(); + } + }; + + // notify the registration we are done! synchronized (this.registrationLock) { this.registrationLock.notify(); @@ -153,12 +200,6 @@ class EndPointClient extends EndPoint implements Runnab @Override public ConnectionBridge send() { - ConnectionBridgeFlushAlways connectionBridgeFlushAlways2 = this.connectionBridgeFlushAlways; - if (connectionBridgeFlushAlways2 == null) { - ConnectionBridge clientBridge = this.connection.send(); - this.connectionBridgeFlushAlways = new ConnectionBridgeFlushAlways(clientBridge); - } - return this.connectionBridgeFlushAlways; } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/bridge/ConnectionBridgeFlushAlways.java b/Dorkbox-Network/src/dorkbox/network/connection/bridge/ConnectionBridgeFlushAlways.java deleted file mode 100644 index 643a5dbb..00000000 --- a/Dorkbox-Network/src/dorkbox/network/connection/bridge/ConnectionBridgeFlushAlways.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.connection.bridge; - -import dorkbox.network.connection.ConnectionPoint; -import dorkbox.network.connection.Ping; - -public -class ConnectionBridgeFlushAlways implements ConnectionBridge { - - private final ConnectionBridge originalBridge; - - public - ConnectionBridgeFlushAlways(ConnectionBridge originalBridge) { - this.originalBridge = originalBridge; - } - - @Override - public - void self(Object message) { - this.originalBridge.self(message); - flush(); - } - - @Override - public - ConnectionPoint TCP(Object message) { - ConnectionPoint connection = this.originalBridge.TCP(message); - connection.flush(); - return connection; - } - - @Override - public - ConnectionPoint UDP(Object message) { - ConnectionPoint connection = this.originalBridge.UDP(message); - connection.flush(); - return connection; - } - - @Override - public - ConnectionPoint UDT(Object message) { - ConnectionPoint connection = this.originalBridge.UDT(message); - connection.flush(); - return connection; - } - - @Override - public - Ping ping() { - Ping ping = this.originalBridge.ping(); - flush(); - return ping; - } - - @Override - public - void flush() { - this.originalBridge.flush(); - } -}