diff --git a/src/dorkbox/network/connection/RegistrationWrapper.java b/src/dorkbox/network/connection/RegistrationWrapper.java index cb64606e..d74b52e7 100644 --- a/src/dorkbox/network/connection/RegistrationWrapper.java +++ b/src/dorkbox/network/connection/RegistrationWrapper.java @@ -26,8 +26,8 @@ import org.slf4j.Logger; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; -import dorkbox.network.pipeline.tcp.KryoEncoder; -import dorkbox.network.pipeline.tcp.KryoEncoderCrypto; +import dorkbox.network.pipeline.tcp.KryoEncoderTcp; +import dorkbox.network.pipeline.tcp.KryoEncoderTcpCrypto; import dorkbox.network.pipeline.udp.KryoDecoderUdp; import dorkbox.network.pipeline.udp.KryoDecoderUdpCrypto; import dorkbox.network.pipeline.udp.KryoEncoderUdp; @@ -52,8 +52,8 @@ class RegistrationWrapper { private final org.slf4j.Logger logger; - public final KryoEncoder kryoTcpEncoder; - public final KryoEncoderCrypto kryoTcpEncoderCrypto; + public final KryoEncoderTcp kryoTcpEncoder; + public final KryoEncoderTcpCrypto kryoTcpEncoderCrypto; public final KryoEncoderUdp kryoUdpEncoder; public final KryoEncoderUdpCrypto kryoUdpEncoderCrypto; @@ -71,8 +71,8 @@ class RegistrationWrapper { this.endPoint = endPoint; this.logger = logger; - this.kryoTcpEncoder = new KryoEncoder(endPoint.serializationManager); - this.kryoTcpEncoderCrypto = new KryoEncoderCrypto(endPoint.serializationManager); + this.kryoTcpEncoder = new KryoEncoderTcp(endPoint.serializationManager); + this.kryoTcpEncoderCrypto = new KryoEncoderTcpCrypto(endPoint.serializationManager); this.kryoUdpEncoder = new KryoEncoderUdp(endPoint.serializationManager); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index 61eb8848..01b1a62b 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -30,8 +30,8 @@ import dorkbox.network.connection.registration.ConnectionRegistrationImpl; import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.Registration; import dorkbox.network.connection.registration.RegistrationHandler; -import dorkbox.network.pipeline.tcp.KryoDecoder; -import dorkbox.network.pipeline.tcp.KryoDecoderCrypto; +import dorkbox.network.pipeline.tcp.KryoDecoderTcp; +import dorkbox.network.pipeline.tcp.KryoDecoderTcpCrypto; import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.crypto.CryptoECC; import io.netty.channel.Channel; @@ -99,7 +99,7 @@ class RegistrationRemoteHandler extends RegistrationHandler { // DECODE (or upstream) /////////////////////// pipeline.addFirst(FRAME_AND_KRYO_DECODER, - new KryoDecoder(this.serializationManager)); // cannot be shared because of possible fragmentation. + new KryoDecoderTcp(this.serializationManager)); // cannot be shared because of possible fragmentation. } else if (isUdpChannel) { // can be shared because there cannot be fragmentation for our UDP packets. If there is, we throw an error and continue... @@ -230,7 +230,7 @@ class RegistrationRemoteHandler extends RegistrationHandler { if (metaChannel.tcpChannel == channel) { pipeline.replace(FRAME_AND_KRYO_DECODER, FRAME_AND_KRYO_CRYPTO_DECODER, - new KryoDecoderCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation. + new KryoDecoderTcpCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation. } if (metaChannel.udpChannel == channel) { diff --git a/src/dorkbox/network/pipeline/tcp/KryoDecoder.java b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcp.java similarity index 96% rename from src/dorkbox/network/pipeline/tcp/KryoDecoder.java rename to src/dorkbox/network/pipeline/tcp/KryoDecoderTcp.java index df022aa3..4a3e4c60 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoDecoder.java +++ b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcp.java @@ -25,16 +25,15 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; public -class KryoDecoder extends ByteToMessageDecoder { +class KryoDecoderTcp extends ByteToMessageDecoder { private final NetworkSerializationManager serializationManager; public - KryoDecoder(final NetworkSerializationManager serializationManager) { + KryoDecoderTcp(final NetworkSerializationManager serializationManager) { super(); this.serializationManager = serializationManager; } - @SuppressWarnings("unused") protected Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception { // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. @@ -165,8 +164,8 @@ class KryoDecoder extends ByteToMessageDecoder { try { object = readObject(serializationManager, context, in, length); out.add(object); - } catch (Exception ex) { - context.fireExceptionCaught(new IOException("Unable to deserialize object for " + this.getClass(), ex)); + } catch (Exception e) { + context.fireExceptionCaught(new IOException("Unable to deserialize object!", e)); } } } diff --git a/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcpCrypto.java similarity index 65% rename from src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java rename to src/dorkbox/network/pipeline/tcp/KryoDecoderTcpCrypto.java index 582c2394..371dfee2 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java +++ b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcpCrypto.java @@ -23,26 +23,18 @@ import io.netty.channel.ChannelHandlerContext; // on client this is MessageToMessage (because of the UdpDecoder in the pipeline!) public -class KryoDecoderCrypto extends KryoDecoder { +class KryoDecoderTcpCrypto extends KryoDecoderTcp { public - KryoDecoderCrypto(final NetworkSerializationManager serializationManager) { + KryoDecoderTcpCrypto(final NetworkSerializationManager serializationManager) { super(serializationManager); } @Override protected Object readObject(final NetworkSerializationManager serializationManager, - final ChannelHandlerContext context, - final ByteBuf in, - final int length) throws Exception { - - try { - Connection_ connection = (Connection_) context.pipeline() - .last(); - return serializationManager.readWithCrypto(connection, in, length); - } catch (Exception e) { - throw e; - } + final ChannelHandlerContext context, final ByteBuf in, final int length) throws Exception { + Connection_ connection = (Connection_) context.pipeline().last(); + return serializationManager.readWithCrypto(connection, in, length); } } diff --git a/src/dorkbox/network/pipeline/tcp/KryoEncoder.java b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcp.java similarity index 90% rename from src/dorkbox/network/pipeline/tcp/KryoEncoder.java rename to src/dorkbox/network/pipeline/tcp/KryoEncoderTcp.java index 0e1fd105..f200f436 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoEncoder.java +++ b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcp.java @@ -26,20 +26,19 @@ import io.netty.handler.codec.MessageToByteEncoder; @Sharable public -class KryoEncoder extends MessageToByteEncoder { +class KryoEncoderTcp extends MessageToByteEncoder { // maximum size of length field. Un-optimized will always be 4, but optimized version can take from 1 - 4 (for 0-Integer.MAX_VALUE). private static final int reservedLengthIndex = 4; private final NetworkSerializationManager serializationManager; // When this is a UDP encode, there are ALREADY size limits placed on the buffer, so any extra checks are unnecessary public - KryoEncoder(final NetworkSerializationManager serializationManager) { + KryoEncoderTcp(final NetworkSerializationManager serializationManager) { super(true); // just use direct buffers anyways. When using Heap buffers, they because chunked and the backing array is invalid. this.serializationManager = serializationManager; } // the crypto writer will override this - @SuppressWarnings("unused") protected void writeObject(final NetworkSerializationManager kryoWrapper, final ChannelHandlerContext context, @@ -52,16 +51,16 @@ class KryoEncoder extends MessageToByteEncoder { @Override protected - void encode(final ChannelHandlerContext context, final Object msg, final ByteBuf out) throws Exception { + void encode(final ChannelHandlerContext context, final Object message, final ByteBuf out) throws Exception { // we don't necessarily start at 0!! // START at index = 4. This is to make room for the integer placed by the frameEncoder for TCP. int startIndex = out.writerIndex() + reservedLengthIndex; - if (msg != null) { + if (message != null) { out.writerIndex(startIndex); try { - writeObject(this.serializationManager, context, msg, out); + writeObject(this.serializationManager, context, message, out); int index = out.writerIndex(); // now set the frame length @@ -80,8 +79,8 @@ class KryoEncoder extends MessageToByteEncoder { // newIndex is actually where we want to start reading the data as well when written to the socket out.setIndex(indexForLength, index); - } catch (Exception ex) { - context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass().getName(), ex)); + } catch (Exception e) { + context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + message.getClass().getName(), e)); } } } diff --git a/src/dorkbox/network/pipeline/tcp/KryoEncoderCrypto.java b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpCrypto.java similarity index 77% rename from src/dorkbox/network/pipeline/tcp/KryoEncoderCrypto.java rename to src/dorkbox/network/pipeline/tcp/KryoEncoderTcpCrypto.java index 136136b3..0497bd7c 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoEncoderCrypto.java +++ b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpCrypto.java @@ -25,22 +25,18 @@ import io.netty.channel.ChannelHandlerContext; @Sharable public -class KryoEncoderCrypto extends KryoEncoder { +class KryoEncoderTcpCrypto extends KryoEncoderTcp { public - KryoEncoderCrypto(final NetworkSerializationManager serializationManager) { + KryoEncoderTcpCrypto(final NetworkSerializationManager serializationManager) { super(serializationManager); } @Override protected void writeObject(final NetworkSerializationManager serializationManager, - final ChannelHandlerContext context, - final Object msg, - final ByteBuf buffer) throws IOException { - - Connection_ connection = (Connection_) context.pipeline() - .last(); + final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException { + Connection_ connection = (Connection_) context.pipeline().last(); serializationManager.writeWithCrypto(connection, buffer, msg); } } diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java index 8026e501..3284f98c 100644 --- a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java @@ -18,8 +18,6 @@ package dorkbox.network.pipeline.udp; import java.io.IOException; import java.util.List; -import org.slf4j.LoggerFactory; - import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.AddressedEnvelope; @@ -40,6 +38,12 @@ class KryoDecoderUdp extends MessageToMessageDecoder { this.serializationManager = serializationManager; } + protected + Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception { + // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. + return serializationManager.read(in, length); + } + @Override public boolean acceptInboundMessage(final Object msg) throws Exception { @@ -76,14 +80,9 @@ class KryoDecoderUdp extends MessageToMessageDecoder { ByteBuf data = (ByteBuf) ((AddressedEnvelope) message).content(); try { - // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. - Object object = serializationManager.read(data, data.writerIndex()); - out.add(object); + out.add(readObject(serializationManager, context, data, data.writerIndex())); } catch (IOException e) { - String msg = "Unable to deserialize object"; - LoggerFactory.getLogger(this.getClass()) - .error(msg, e); - throw new IOException(msg, e); + context.fireExceptionCaught(new IOException("Unable to deserialize object!", e)); } } } diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java index 973da936..cc06b3f6 100644 --- a/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java @@ -15,71 +15,24 @@ */ package dorkbox.network.pipeline.udp; -import java.io.IOException; -import java.util.List; - -import org.slf4j.LoggerFactory; - import dorkbox.network.connection.Connection_; import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.socket.DatagramPacket; -import io.netty.handler.codec.MessageToMessageDecoder; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; @Sharable public -class KryoDecoderUdpCrypto extends MessageToMessageDecoder { - - private final NetworkSerializationManager serializationManager; +class KryoDecoderUdpCrypto extends KryoDecoderUdp { public KryoDecoderUdpCrypto(NetworkSerializationManager serializationManager) { - this.serializationManager = serializationManager; + super(serializationManager); } - /** - * Invoked when a {@link Channel} has been idle for a while. - */ - @Override - public - void userEventTriggered(ChannelHandlerContext context, Object event) throws Exception { - // if (e.getState() == IdleState.READER_IDLE) { - // e.getChannel().close(); - // } else if (e.getState() == IdleState.WRITER_IDLE) { - // e.getChannel().write(new Object()); - // } else - if (event instanceof IdleStateEvent) { - if (((IdleStateEvent) event).state() == IdleState.ALL_IDLE) { - // will auto-flush if necessary - // TODO: if we have been idle TOO LONG, then we close this channel! - // if we are idle for a much smaller amount of time, then we pass the idle message up to the connection? - - // this.sessionManager.onIdle(this); - } - } - - super.userEventTriggered(context, event); - } - - @Override - public - void decode(ChannelHandlerContext context, DatagramPacket in, List out) throws Exception { - try { - Connection_ last = (Connection_) context.pipeline() - .last(); - ByteBuf data = in.content(); - Object object = serializationManager.readWithCrypto(last, data, data.readableBytes()); - out.add(object); - } catch (IOException e) { - String message = "Unable to deserialize object"; - LoggerFactory.getLogger(this.getClass()) - .error(message, e); - throw new IOException(message, e); - } + protected + Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception { + Connection_ connection = (Connection_) context.pipeline().last(); + return serializationManager.readWithCrypto(connection, in, length); } } diff --git a/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java b/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java index 19f76f35..67870a82 100644 --- a/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java +++ b/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java @@ -44,6 +44,13 @@ class KryoEncoderUdp extends MessageToMessageEncoder { this.serializationManager = serializationManager; } + // the crypto writer will override this + protected + void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) throws IOException { + // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. + serializationManager.write(buffer, msg); + } + @Override protected void encode(ChannelHandlerContext context, Object message, List out) throws Exception { @@ -70,18 +77,8 @@ class KryoEncoderUdp extends MessageToMessageEncoder { .remoteAddress()); out.add(packet); } catch (Exception e) { - String msg = "Unable to serialize object of type: " + message.getClass() - .getName(); - LoggerFactory.getLogger(this.getClass()) - .error(msg, e); - throw new IOException(msg, e); + context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + message.getClass().getName(), e)); } } } - - // the crypto writer will override this - void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) throws IOException { - // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. - serializationManager.write(buffer, msg); - } } diff --git a/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java index 22861fc8..c117cc08 100644 --- a/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java +++ b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java @@ -33,10 +33,9 @@ class KryoEncoderUdpCrypto extends KryoEncoderUdp { } @Override + protected void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) throws IOException { - - Connection_ last = (Connection_) ctx.pipeline() - .last(); + Connection_ last = (Connection_) ctx.pipeline().last(); serializationManager.writeWithCrypto(last, buffer, msg); } }