Added backpressure for writing to the socket for the CLIENT (not for listeners, which are in the eventloop)

This commit is contained in:
nathan 2016-02-29 21:23:02 +01:00
parent 5e67cc1813
commit 28c36233c4
3 changed files with 154 additions and 108 deletions

View File

@ -66,8 +66,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
private final org.slf4j.Logger logger; private final org.slf4j.Logger logger;
private final AtomicBoolean writableSignalNeeded = new AtomicBoolean(false); private final AtomicBoolean needsLock = new AtomicBoolean(false);
private final Object writableLock = new Object(); private final AtomicBoolean writeSignalNeeded = new AtomicBoolean(false);
private final Object writeLock = new Object();
private final AtomicBoolean closeInProgress = new AtomicBoolean(false); private final AtomicBoolean closeInProgress = new AtomicBoolean(false);
private final AtomicBoolean alreadyClosed = new AtomicBoolean(false); private final AtomicBoolean alreadyClosed = new AtomicBoolean(false);
@ -284,9 +285,10 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
super.channelWritabilityChanged(ctx); super.channelWritabilityChanged(ctx);
// needed to place back-pressure when writing too much data to the connection // needed to place back-pressure when writing too much data to the connection
if (writableSignalNeeded.getAndSet(false)) { if (writeSignalNeeded.getAndSet(false)) {
synchronized (writableLock) { synchronized (writeLock) {
writableLock.notifyAll(); needsLock.set(false);
writeLock.notifyAll();
} }
} }
} }
@ -297,15 +299,20 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
* This blocks until we are writable again * This blocks until we are writable again
*/ */
private private
void controlBackPressure(ConnectionPointWriter c) { void controlBackPressure(ConnectionPoint c) {
if (!c.isWritable()) { while (!c.isWritable()) {
writableSignalNeeded.set(true); needsLock.set(true);
writeSignalNeeded.set(true);
synchronized (writableLock) { synchronized (writeLock) {
try { if (needsLock.get()) {
writableLock.wait(); try {
} catch (InterruptedException e) { // waits 1 second maximum per check. This is to guarantee that eventually (in the case of deadlocks, which i've seen)
e.printStackTrace(); // it will get released. The while loop makes sure it will exit when the channel is writable
writeLock.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
} }
} }
@ -336,18 +343,18 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
/** /**
* Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP) * Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP)
*/ */
@Override final
public final ConnectionPoint TCP_backpressure(Object message) {
ConnectionPoint TCP(Object message) {
Logger logger2 = this.logger; Logger logger2 = this.logger;
if (!this.closeInProgress.get()) { if (!this.closeInProgress.get()) {
if (logger2.isTraceEnabled()) { if (logger2.isTraceEnabled()) {
logger2.trace("Sending TCP {}", message); logger2.trace("Sending TCP {}", message);
} }
ConnectionPointWriter tcp = this.channelWrapper.tcp(); ConnectionPointWriter tcp = this.channelWrapper.tcp();
// needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from
// needed to place back-pressure when writing too much data to the connection // INSIDE the event loop
controlBackPressure(tcp); controlBackPressure(tcp);
tcp.write(message); tcp.write(message);
return tcp; return tcp;
} }
@ -361,7 +368,58 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
} }
/** /**
* Sends the object over the network using UDP (or via LOCAL when it's a local channel). * Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP)
*/
@Override
public final
ConnectionPoint TCP(Object message) {
Logger logger2 = this.logger;
if (!this.closeInProgress.get()) {
if (logger2.isTraceEnabled()) {
logger2.trace("Sending TCP {}", message);
}
ConnectionPointWriter tcp = this.channelWrapper.tcp();
tcp.write(message);
return tcp;
}
else {
if (logger2.isDebugEnabled()) {
logger2.debug("writing TCP while closed: {}", message);
}
// we have to return something, otherwise dependent code will throw a null pointer exception
return ChannelNull.get();
}
}
/**
* Sends the object over the network using UDP (LOCAL channels do not care if its TCP or UDP)
*/
final
ConnectionPoint UDP_backpressure(Object message) {
Logger logger2 = this.logger;
if (!this.closeInProgress.get()) {
if (logger2.isTraceEnabled()) {
logger2.trace("Sending UDP {}", message);
}
ConnectionPointWriter udp = this.channelWrapper.udp();
// needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from
// INSIDE the event loop
controlBackPressure(udp);
udp.write(message);
return udp;
}
else {
if (logger2.isDebugEnabled()) {
logger2.debug("writing UDP while closed: {}", message);
}
// we have to return something, otherwise dependent code will throw a null pointer exception
return ChannelNull.get();
}
}
/**
* Sends the object over the network using UDP (LOCAL channels do not care if its TCP or UDP)
*/ */
@Override @Override
public public
@ -372,9 +430,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
logger2.trace("Sending UDP {}", message); logger2.trace("Sending UDP {}", message);
} }
ConnectionPointWriter udp = this.channelWrapper.udp(); ConnectionPointWriter udp = this.channelWrapper.udp();
// needed to place back-pressure when writing too much data to the connection
controlBackPressure(udp);
udp.write(message); udp.write(message);
return udp; return udp;
} }
@ -387,6 +442,33 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
} }
} }
/**
* Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP)
*/
final
ConnectionPoint UDT_backpressure(Object message) {
Logger logger2 = this.logger;
if (!this.closeInProgress.get()) {
if (logger2.isTraceEnabled()) {
logger2.trace("Sending UDT {}", message);
}
ConnectionPointWriter udt = this.channelWrapper.udt();
// needed to place back-pressure when writing too much data to the connection. Will create deadlocks if called from
// INSIDE the event loop
controlBackPressure(udt);
udt.write(message);
return udt;
}
else {
if (logger2.isDebugEnabled()) {
logger2.debug("writing UDT while closed: {}", message);
}
// we have to return something, otherwise dependent code will throw a null pointer exception
return ChannelNull.get();
}
}
/** /**
* Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP) * Sends the object over the network using TCP. (LOCAL channels do not care if its TCP or UDP)
*/ */
@ -399,8 +481,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
logger2.trace("Sending UDT {}", message); logger2.trace("Sending UDT {}", message);
} }
ConnectionPointWriter udt = this.channelWrapper.udt(); ConnectionPointWriter udt = this.channelWrapper.udt();
// needed to place back-pressure when writing too much data to the connection
controlBackPressure(udt);
udt.write(message); udt.write(message);
return udt; return udt;
} }

View File

@ -18,7 +18,6 @@ package dorkbox.network.connection;
import dorkbox.network.Client; import dorkbox.network.Client;
import dorkbox.network.Configuration; import dorkbox.network.Configuration;
import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.bridge.ConnectionBridge;
import dorkbox.network.connection.bridge.ConnectionBridgeFlushAlways;
import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException; import dorkbox.util.exceptions.SecurityException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -44,7 +43,7 @@ class EndPointClient<C extends Connection> extends EndPoint<C> implements Runnab
protected volatile int connectionTimeout = 5000; // default protected volatile int connectionTimeout = 5000; // default
protected volatile boolean registrationComplete = false; protected volatile boolean registrationComplete = false;
private volatile ConnectionBridgeFlushAlways connectionBridgeFlushAlways; private volatile ConnectionBridge connectionBridgeFlushAlways;
public public
@ -134,10 +133,58 @@ class EndPointClient<C extends Connection> extends EndPoint<C> implements Runnab
*/ */
@Override @Override
final final
void connectionConnected0(ConnectionImpl connection) { void connectionConnected0(final ConnectionImpl connection) {
// invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need. // invokes the listener.connection() method, and initialize the connection channels with whatever extra info they might need.
super.connectionConnected0(connection); super.connectionConnected0(connection);
this.connectionBridgeFlushAlways = new ConnectionBridge() {
@Override
public
void self(Object message) {
connection.self(message);
flush();
}
@Override
public
ConnectionPoint TCP(Object message) {
ConnectionPoint tcp = connection.TCP_backpressure(message);
tcp.flush();
return tcp;
}
@Override
public
ConnectionPoint UDP(Object message) {
ConnectionPoint udp = connection.UDP_backpressure(message);
udp.flush();
return udp;
}
@Override
public
ConnectionPoint UDT(Object message) {
ConnectionPoint udt = connection.UDT_backpressure(message);
udt.flush();
return udt;
}
@Override
public
Ping ping() {
Ping ping = connection.ping();
flush();
return ping;
}
@Override
public
void flush() {
connection.flush();
}
};
// notify the registration we are done! // notify the registration we are done!
synchronized (this.registrationLock) { synchronized (this.registrationLock) {
this.registrationLock.notify(); this.registrationLock.notify();
@ -153,12 +200,6 @@ class EndPointClient<C extends Connection> extends EndPoint<C> implements Runnab
@Override @Override
public public
ConnectionBridge send() { ConnectionBridge send() {
ConnectionBridgeFlushAlways connectionBridgeFlushAlways2 = this.connectionBridgeFlushAlways;
if (connectionBridgeFlushAlways2 == null) {
ConnectionBridge clientBridge = this.connection.send();
this.connectionBridgeFlushAlways = new ConnectionBridgeFlushAlways(clientBridge);
}
return this.connectionBridgeFlushAlways; return this.connectionBridgeFlushAlways;
} }

View File

@ -1,75 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.bridge;
import dorkbox.network.connection.ConnectionPoint;
import dorkbox.network.connection.Ping;
public
class ConnectionBridgeFlushAlways implements ConnectionBridge {
private final ConnectionBridge originalBridge;
public
ConnectionBridgeFlushAlways(ConnectionBridge originalBridge) {
this.originalBridge = originalBridge;
}
@Override
public
void self(Object message) {
this.originalBridge.self(message);
flush();
}
@Override
public
ConnectionPoint TCP(Object message) {
ConnectionPoint connection = this.originalBridge.TCP(message);
connection.flush();
return connection;
}
@Override
public
ConnectionPoint UDP(Object message) {
ConnectionPoint connection = this.originalBridge.UDP(message);
connection.flush();
return connection;
}
@Override
public
ConnectionPoint UDT(Object message) {
ConnectionPoint connection = this.originalBridge.UDT(message);
connection.flush();
return connection;
}
@Override
public
Ping ping() {
Ping ping = this.originalBridge.ping();
flush();
return ping;
}
@Override
public
void flush() {
this.originalBridge.flush();
}
}