Added backpressure when writing to the channel, if it's not writable

This commit is contained in:
nathan 2016-02-28 03:10:10 +01:00
parent dcd6dd5365
commit 8d1d43616a
6 changed files with 72 additions and 34 deletions

View File

@ -66,6 +66,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
private final org.slf4j.Logger logger; private final org.slf4j.Logger logger;
private final AtomicBoolean writableSignalNeeded = new AtomicBoolean(false);
private final Object writableLock = new Object();
private final AtomicBoolean closeInProgress = new AtomicBoolean(false); private final AtomicBoolean closeInProgress = new AtomicBoolean(false);
private final AtomicBoolean alreadyClosed = new AtomicBoolean(false); private final AtomicBoolean alreadyClosed = new AtomicBoolean(false);
private final Object closeInProgressLock = new Object(); private final Object closeInProgressLock = new Object();
@ -275,6 +278,39 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
return this.channelWrapper.udt() != null; return this.channelWrapper.udt() != null;
} }
@Override
public
void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
// needed to place back-pressure when writing too much data to the connection
if (writableSignalNeeded.getAndSet(false)) {
synchronized (writableLock) {
writableLock.notifyAll();
}
}
}
/**
* needed to place back-pressure when writing too much data to the connection.
*
* This blocks until we are writable again
*/
private
void controlBackPressure(ConnectionPointWriter c) {
if (!c.isWritable()) {
writableSignalNeeded.set(true);
synchronized (writableLock) {
try {
writableLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/** /**
* Expose methods to send objects to a destination. * Expose methods to send objects to a destination.
*/ */
@ -309,6 +345,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
logger2.trace("Sending TCP {}", message); logger2.trace("Sending TCP {}", message);
} }
ConnectionPointWriter tcp = this.channelWrapper.tcp(); ConnectionPointWriter tcp = this.channelWrapper.tcp();
// needed to place back-pressure when writing too much data to the connection
controlBackPressure(tcp);
tcp.write(message); tcp.write(message);
return tcp; return tcp;
} }
@ -319,7 +358,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
// we have to return something, otherwise dependent code will throw a null pointer exception // we have to return something, otherwise dependent code will throw a null pointer exception
return ChannelNull.get(); return ChannelNull.get();
} }
} }
/** /**
@ -334,6 +372,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
logger2.trace("Sending UDP {}", message); logger2.trace("Sending UDP {}", message);
} }
ConnectionPointWriter udp = this.channelWrapper.udp(); ConnectionPointWriter udp = this.channelWrapper.udp();
// needed to place back-pressure when writing too much data to the connection
controlBackPressure(udp);
udp.write(message); udp.write(message);
return udp; return udp;
} }
@ -358,6 +399,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection,
logger2.trace("Sending UDT {}", message); logger2.trace("Sending UDT {}", message);
} }
ConnectionPointWriter udt = this.channelWrapper.udt(); ConnectionPointWriter udt = this.channelWrapper.udt();
// needed to place back-pressure when writing too much data to the connection
controlBackPressure(udt);
udt.write(message); udt.write(message);
return udt; return udt;
} }

View File

@ -19,9 +19,9 @@ public
interface ConnectionPoint { interface ConnectionPoint {
/** /**
* Waits for the last write to complete. Useful when sending large amounts of data at once. * @return true if the channel is writable. Useful when sending large amounts of data at once.
*/ */
void waitForWriteToComplete(); boolean isWritable();
/** /**
* Flushes the contents of the TCP/UDP/UDT/etc pipes to the wire. * Flushes the contents of the TCP/UDP/UDT/etc pipes to the wire.

View File

@ -34,11 +34,11 @@ class ServerConnectionBridge<C extends Connection>
/** /**
* Not implemented, since this would cause horrendous problems. * Not implemented, since this would cause horrendous problems.
* *
* @see dorkbox.network.connection.ConnectionPoint#waitForWriteToComplete() * @see dorkbox.network.connection.ConnectionPoint#isWritable()
*/ */
@Override @Override
public public
void waitForWriteToComplete() { boolean isWritable() {
throw new UnsupportedOperationException("Method not implemented"); throw new UnsupportedOperationException("Method not implemented");
} }

View File

@ -50,10 +50,14 @@ class ChannelLocalWrapper<C extends Connection> implements ChannelWrapper<C>, Co
this.shouldFlush.set(true); this.shouldFlush.set(true);
} }
/**
* @return true if the channel is writable. Useful when sending large amounts of data at once.
*/
@Override @Override
public public
void waitForWriteToComplete() { boolean isWritable() {
// it's immediate, since it's in the same JVM. // it's immediate, since it's in the same JVM.
return true;
} }
@Override @Override

View File

@ -17,7 +17,7 @@ package dorkbox.network.connection.wrapper;
import dorkbox.network.connection.ConnectionPointWriter; import dorkbox.network.connection.ConnectionPointWriter;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -26,60 +26,51 @@ class ChannelNetwork implements ConnectionPointWriter {
private final Channel channel; private final Channel channel;
private final AtomicBoolean shouldFlush = new AtomicBoolean(false); private final AtomicBoolean shouldFlush = new AtomicBoolean(false);
private final ChannelPromise voidPromise;
private volatile ChannelFuture lastWriteFuture;
public public
ChannelNetwork(Channel channel) { ChannelNetwork(Channel channel) {
this.channel = channel; this.channel = channel;
voidPromise = channel.voidPromise();
} }
/** /**
* Write an object to the underlying channel * Write an object to the underlying channel. If the underlying channel is NOT writable, this will block unit it is writable
*/ */
@Override @Override
public public
void write(Object object) { void write(Object object) {
this.lastWriteFuture = this.channel.write(object); // we don't care, or want to save the future. This is so GC is less.
this.shouldFlush.set(true); channel.write(object, voidPromise);
shouldFlush.set(true);
} }
/** /**
* Waits for the last write to complete. Useful when sending large amounts of data at once. * @return true if the channel is writable. Useful when sending large amounts of data at once.
* <b>DO NOT use this in the same thread as receiving messages! It will deadlock.</b>
*/ */
@Override @Override
public public
void waitForWriteToComplete() { boolean isWritable() {
if (this.lastWriteFuture != null) { return channel.isWritable();
this.lastWriteFuture.awaitUninterruptibly();
}
} }
@Override @Override
public public
void flush() { void flush() {
if (this.shouldFlush.compareAndSet(true, false)) { if (shouldFlush.compareAndSet(true, false)) {
this.channel.flush(); channel.flush();
} }
} }
public public
void close(long maxShutdownWaitTimeInMilliSeconds) { void close(long maxShutdownWaitTimeInMilliSeconds) {
// Wait until all messages are flushed before closing the channel. shouldFlush.set(false);
if (this.lastWriteFuture != null) { channel.close()
this.lastWriteFuture.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
this.lastWriteFuture = null;
}
this.shouldFlush.set(false);
this.channel.close()
.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds);
} }
public public
int id() { int id() {
return this.channel.hashCode(); return channel.hashCode();
} }
} }

View File

@ -41,12 +41,12 @@ class ChannelNull implements ConnectionPointWriter {
} }
/** /**
* Waits for the last write to complete. Useful when sending large amounts of data at once. * @return true if the channel is writable. Useful when sending large amounts of data at once.
* <b>DO NOT use this in the same thread as receiving messages! It will deadlock.</b>
*/ */
@Override @Override
public public
void waitForWriteToComplete() { boolean isWritable() {
return true;
} }
@Override @Override