diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java index 849acea3..be2409a8 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionImpl.java @@ -66,6 +66,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, private final org.slf4j.Logger logger; + private final AtomicBoolean writableSignalNeeded = new AtomicBoolean(false); + private final Object writableLock = new Object(); + private final AtomicBoolean closeInProgress = new AtomicBoolean(false); private final AtomicBoolean alreadyClosed = new AtomicBoolean(false); private final Object closeInProgressLock = new Object(); @@ -275,6 +278,39 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, return this.channelWrapper.udt() != null; } + @Override + public + void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + + // needed to place back-pressure when writing too much data to the connection + if (writableSignalNeeded.getAndSet(false)) { + synchronized (writableLock) { + writableLock.notifyAll(); + } + } + } + + /** + * needed to place back-pressure when writing too much data to the connection. + * + * This blocks until we are writable again + */ + private + void controlBackPressure(ConnectionPointWriter c) { + if (!c.isWritable()) { + writableSignalNeeded.set(true); + + synchronized (writableLock) { + try { + writableLock.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + /** * Expose methods to send objects to a destination. */ @@ -309,6 +345,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, logger2.trace("Sending TCP {}", message); } ConnectionPointWriter tcp = this.channelWrapper.tcp(); + + // needed to place back-pressure when writing too much data to the connection + controlBackPressure(tcp); tcp.write(message); return tcp; } @@ -319,7 +358,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, // we have to return something, otherwise dependent code will throw a null pointer exception return ChannelNull.get(); } - } /** @@ -334,6 +372,9 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, logger2.trace("Sending UDP {}", message); } ConnectionPointWriter udp = this.channelWrapper.udp(); + + // needed to place back-pressure when writing too much data to the connection + controlBackPressure(udp); udp.write(message); return udp; } @@ -358,6 +399,8 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection, logger2.trace("Sending UDT {}", message); } ConnectionPointWriter udt = this.channelWrapper.udt(); + // needed to place back-pressure when writing too much data to the connection + controlBackPressure(udt); udt.write(message); return udt; } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionPoint.java b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionPoint.java index 83db9e1b..5d5ef8fd 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ConnectionPoint.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ConnectionPoint.java @@ -19,9 +19,9 @@ public interface ConnectionPoint { /** - * Waits for the last write to complete. Useful when sending large amounts of data at once. + * @return true if the channel is writable. Useful when sending large amounts of data at once. */ - void waitForWriteToComplete(); + boolean isWritable(); /** * Flushes the contents of the TCP/UDP/UDT/etc pipes to the wire. diff --git a/Dorkbox-Network/src/dorkbox/network/connection/ServerConnectionBridge.java b/Dorkbox-Network/src/dorkbox/network/connection/ServerConnectionBridge.java index 237fcb5b..56df89c7 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/ServerConnectionBridge.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/ServerConnectionBridge.java @@ -34,11 +34,11 @@ class ServerConnectionBridge /** * Not implemented, since this would cause horrendous problems. * - * @see dorkbox.network.connection.ConnectionPoint#waitForWriteToComplete() + * @see dorkbox.network.connection.ConnectionPoint#isWritable() */ @Override public - void waitForWriteToComplete() { + boolean isWritable() { throw new UnsupportedOperationException("Method not implemented"); } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java b/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java index e089734a..9e138d2d 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java @@ -50,10 +50,14 @@ class ChannelLocalWrapper implements ChannelWrapper, Co this.shouldFlush.set(true); } + /** + * @return true if the channel is writable. Useful when sending large amounts of data at once. + */ @Override public - void waitForWriteToComplete() { + boolean isWritable() { // it's immediate, since it's in the same JVM. + return true; } @Override diff --git a/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNetwork.java b/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNetwork.java index 87510cf3..ebda3ef1 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNetwork.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNetwork.java @@ -17,7 +17,7 @@ package dorkbox.network.connection.wrapper; import dorkbox.network.connection.ConnectionPointWriter; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,60 +26,51 @@ class ChannelNetwork implements ConnectionPointWriter { private final Channel channel; private final AtomicBoolean shouldFlush = new AtomicBoolean(false); - - private volatile ChannelFuture lastWriteFuture; - + private final ChannelPromise voidPromise; public ChannelNetwork(Channel channel) { this.channel = channel; + voidPromise = channel.voidPromise(); } /** - * Write an object to the underlying channel + * Write an object to the underlying channel. If the underlying channel is NOT writable, this will block unit it is writable */ @Override public void write(Object object) { - this.lastWriteFuture = this.channel.write(object); - this.shouldFlush.set(true); + // we don't care, or want to save the future. This is so GC is less. + channel.write(object, voidPromise); + shouldFlush.set(true); } /** - * Waits for the last write to complete. Useful when sending large amounts of data at once. - * DO NOT use this in the same thread as receiving messages! It will deadlock. + * @return true if the channel is writable. Useful when sending large amounts of data at once. */ @Override public - void waitForWriteToComplete() { - if (this.lastWriteFuture != null) { - this.lastWriteFuture.awaitUninterruptibly(); - } + boolean isWritable() { + return channel.isWritable(); } @Override public void flush() { - if (this.shouldFlush.compareAndSet(true, false)) { - this.channel.flush(); + if (shouldFlush.compareAndSet(true, false)) { + channel.flush(); } } public void close(long maxShutdownWaitTimeInMilliSeconds) { - // Wait until all messages are flushed before closing the channel. - if (this.lastWriteFuture != null) { - this.lastWriteFuture.awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); - this.lastWriteFuture = null; - } - - this.shouldFlush.set(false); - this.channel.close() - .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); + shouldFlush.set(false); + channel.close() + .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); } public int id() { - return this.channel.hashCode(); + return channel.hashCode(); } } diff --git a/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNull.java b/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNull.java index 5370fec4..263739aa 100644 --- a/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNull.java +++ b/Dorkbox-Network/src/dorkbox/network/connection/wrapper/ChannelNull.java @@ -41,12 +41,12 @@ class ChannelNull implements ConnectionPointWriter { } /** - * Waits for the last write to complete. Useful when sending large amounts of data at once. - * DO NOT use this in the same thread as receiving messages! It will deadlock. + * @return true if the channel is writable. Useful when sending large amounts of data at once. */ @Override public - void waitForWriteToComplete() { + boolean isWritable() { + return true; } @Override