Cleaned up Kryo names + exceptions

This commit is contained in:
nathan 2019-06-13 20:32:12 +02:00
parent 7b74a730ba
commit b4ff95b496
10 changed files with 54 additions and 120 deletions

View File

@ -26,8 +26,8 @@ import org.slf4j.Logger;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.pipeline.tcp.KryoEncoder;
import dorkbox.network.pipeline.tcp.KryoEncoderCrypto;
import dorkbox.network.pipeline.tcp.KryoEncoderTcp;
import dorkbox.network.pipeline.tcp.KryoEncoderTcpCrypto;
import dorkbox.network.pipeline.udp.KryoDecoderUdp;
import dorkbox.network.pipeline.udp.KryoDecoderUdpCrypto;
import dorkbox.network.pipeline.udp.KryoEncoderUdp;
@ -52,8 +52,8 @@ class RegistrationWrapper {
private final org.slf4j.Logger logger;
public final KryoEncoder kryoTcpEncoder;
public final KryoEncoderCrypto kryoTcpEncoderCrypto;
public final KryoEncoderTcp kryoTcpEncoder;
public final KryoEncoderTcpCrypto kryoTcpEncoderCrypto;
public final KryoEncoderUdp kryoUdpEncoder;
public final KryoEncoderUdpCrypto kryoUdpEncoderCrypto;
@ -71,8 +71,8 @@ class RegistrationWrapper {
this.endPoint = endPoint;
this.logger = logger;
this.kryoTcpEncoder = new KryoEncoder(endPoint.serializationManager);
this.kryoTcpEncoderCrypto = new KryoEncoderCrypto(endPoint.serializationManager);
this.kryoTcpEncoder = new KryoEncoderTcp(endPoint.serializationManager);
this.kryoTcpEncoderCrypto = new KryoEncoderTcpCrypto(endPoint.serializationManager);
this.kryoUdpEncoder = new KryoEncoderUdp(endPoint.serializationManager);

View File

@ -30,8 +30,8 @@ import dorkbox.network.connection.registration.ConnectionRegistrationImpl;
import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.network.connection.registration.Registration;
import dorkbox.network.connection.registration.RegistrationHandler;
import dorkbox.network.pipeline.tcp.KryoDecoder;
import dorkbox.network.pipeline.tcp.KryoDecoderCrypto;
import dorkbox.network.pipeline.tcp.KryoDecoderTcp;
import dorkbox.network.pipeline.tcp.KryoDecoderTcpCrypto;
import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.util.crypto.CryptoECC;
import io.netty.channel.Channel;
@ -99,7 +99,7 @@ class RegistrationRemoteHandler extends RegistrationHandler {
// DECODE (or upstream)
///////////////////////
pipeline.addFirst(FRAME_AND_KRYO_DECODER,
new KryoDecoder(this.serializationManager)); // cannot be shared because of possible fragmentation.
new KryoDecoderTcp(this.serializationManager)); // cannot be shared because of possible fragmentation.
}
else if (isUdpChannel) {
// can be shared because there cannot be fragmentation for our UDP packets. If there is, we throw an error and continue...
@ -230,7 +230,7 @@ class RegistrationRemoteHandler extends RegistrationHandler {
if (metaChannel.tcpChannel == channel) {
pipeline.replace(FRAME_AND_KRYO_DECODER,
FRAME_AND_KRYO_CRYPTO_DECODER,
new KryoDecoderCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation.
new KryoDecoderTcpCrypto(this.serializationManager)); // cannot be shared because of possible fragmentation.
}
if (metaChannel.udpChannel == channel) {

View File

@ -25,16 +25,15 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public
class KryoDecoder extends ByteToMessageDecoder {
class KryoDecoderTcp extends ByteToMessageDecoder {
private final NetworkSerializationManager serializationManager;
public
KryoDecoder(final NetworkSerializationManager serializationManager) {
KryoDecoderTcp(final NetworkSerializationManager serializationManager) {
super();
this.serializationManager = serializationManager;
}
@SuppressWarnings("unused")
protected
Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
@ -165,8 +164,8 @@ class KryoDecoder extends ByteToMessageDecoder {
try {
object = readObject(serializationManager, context, in, length);
out.add(object);
} catch (Exception ex) {
context.fireExceptionCaught(new IOException("Unable to deserialize object for " + this.getClass(), ex));
} catch (Exception e) {
context.fireExceptionCaught(new IOException("Unable to deserialize object!", e));
}
}
}

View File

@ -23,26 +23,18 @@ import io.netty.channel.ChannelHandlerContext;
// on client this is MessageToMessage (because of the UdpDecoder in the pipeline!)
public
class KryoDecoderCrypto extends KryoDecoder {
class KryoDecoderTcpCrypto extends KryoDecoderTcp {
public
KryoDecoderCrypto(final NetworkSerializationManager serializationManager) {
KryoDecoderTcpCrypto(final NetworkSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
Object readObject(final NetworkSerializationManager serializationManager,
final ChannelHandlerContext context,
final ByteBuf in,
final int length) throws Exception {
try {
Connection_ connection = (Connection_) context.pipeline()
.last();
return serializationManager.readWithCrypto(connection, in, length);
} catch (Exception e) {
throw e;
}
final ChannelHandlerContext context, final ByteBuf in, final int length) throws Exception {
Connection_ connection = (Connection_) context.pipeline().last();
return serializationManager.readWithCrypto(connection, in, length);
}
}

View File

@ -26,20 +26,19 @@ import io.netty.handler.codec.MessageToByteEncoder;
@Sharable
public
class KryoEncoder extends MessageToByteEncoder<Object> {
class KryoEncoderTcp extends MessageToByteEncoder<Object> {
// maximum size of length field. Un-optimized will always be 4, but optimized version can take from 1 - 4 (for 0-Integer.MAX_VALUE).
private static final int reservedLengthIndex = 4;
private final NetworkSerializationManager serializationManager;
// When this is a UDP encode, there are ALREADY size limits placed on the buffer, so any extra checks are unnecessary
public
KryoEncoder(final NetworkSerializationManager serializationManager) {
KryoEncoderTcp(final NetworkSerializationManager serializationManager) {
super(true); // just use direct buffers anyways. When using Heap buffers, they because chunked and the backing array is invalid.
this.serializationManager = serializationManager;
}
// the crypto writer will override this
@SuppressWarnings("unused")
protected
void writeObject(final NetworkSerializationManager kryoWrapper,
final ChannelHandlerContext context,
@ -52,16 +51,16 @@ class KryoEncoder extends MessageToByteEncoder<Object> {
@Override
protected
void encode(final ChannelHandlerContext context, final Object msg, final ByteBuf out) throws Exception {
void encode(final ChannelHandlerContext context, final Object message, final ByteBuf out) throws Exception {
// we don't necessarily start at 0!!
// START at index = 4. This is to make room for the integer placed by the frameEncoder for TCP.
int startIndex = out.writerIndex() + reservedLengthIndex;
if (msg != null) {
if (message != null) {
out.writerIndex(startIndex);
try {
writeObject(this.serializationManager, context, msg, out);
writeObject(this.serializationManager, context, message, out);
int index = out.writerIndex();
// now set the frame length
@ -80,8 +79,8 @@ class KryoEncoder extends MessageToByteEncoder<Object> {
// newIndex is actually where we want to start reading the data as well when written to the socket
out.setIndex(indexForLength, index);
} catch (Exception ex) {
context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + msg.getClass().getName(), ex));
} catch (Exception e) {
context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + message.getClass().getName(), e));
}
}
}

View File

@ -25,22 +25,18 @@ import io.netty.channel.ChannelHandlerContext;
@Sharable
public
class KryoEncoderCrypto extends KryoEncoder {
class KryoEncoderTcpCrypto extends KryoEncoderTcp {
public
KryoEncoderCrypto(final NetworkSerializationManager serializationManager) {
KryoEncoderTcpCrypto(final NetworkSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
void writeObject(final NetworkSerializationManager serializationManager,
final ChannelHandlerContext context,
final Object msg,
final ByteBuf buffer) throws IOException {
Connection_ connection = (Connection_) context.pipeline()
.last();
final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException {
Connection_ connection = (Connection_) context.pipeline().last();
serializationManager.writeWithCrypto(connection, buffer, msg);
}
}

View File

@ -18,8 +18,6 @@ package dorkbox.network.pipeline.udp;
import java.io.IOException;
import java.util.List;
import org.slf4j.LoggerFactory;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AddressedEnvelope;
@ -40,6 +38,12 @@ class KryoDecoderUdp extends MessageToMessageDecoder<Object> {
this.serializationManager = serializationManager;
}
protected
Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
return serializationManager.read(in, length);
}
@Override
public
boolean acceptInboundMessage(final Object msg) throws Exception {
@ -76,14 +80,9 @@ class KryoDecoderUdp extends MessageToMessageDecoder<Object> {
ByteBuf data = (ByteBuf) ((AddressedEnvelope) message).content();
try {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
Object object = serializationManager.read(data, data.writerIndex());
out.add(object);
out.add(readObject(serializationManager, context, data, data.writerIndex()));
} catch (IOException e) {
String msg = "Unable to deserialize object";
LoggerFactory.getLogger(this.getClass())
.error(msg, e);
throw new IOException(msg, e);
context.fireExceptionCaught(new IOException("Unable to deserialize object!", e));
}
}
}

View File

@ -15,71 +15,24 @@
*/
package dorkbox.network.pipeline.udp;
import java.io.IOException;
import java.util.List;
import org.slf4j.LoggerFactory;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
@Sharable
public
class KryoDecoderUdpCrypto extends MessageToMessageDecoder<DatagramPacket> {
private final NetworkSerializationManager serializationManager;
class KryoDecoderUdpCrypto extends KryoDecoderUdp {
public
KryoDecoderUdpCrypto(NetworkSerializationManager serializationManager) {
this.serializationManager = serializationManager;
super(serializationManager);
}
/**
* Invoked when a {@link Channel} has been idle for a while.
*/
@Override
public
void userEventTriggered(ChannelHandlerContext context, Object event) throws Exception {
// if (e.getState() == IdleState.READER_IDLE) {
// e.getChannel().close();
// } else if (e.getState() == IdleState.WRITER_IDLE) {
// e.getChannel().write(new Object());
// } else
if (event instanceof IdleStateEvent) {
if (((IdleStateEvent) event).state() == IdleState.ALL_IDLE) {
// will auto-flush if necessary
// TODO: if we have been idle TOO LONG, then we close this channel!
// if we are idle for a much smaller amount of time, then we pass the idle message up to the connection?
// this.sessionManager.onIdle(this);
}
}
super.userEventTriggered(context, event);
}
@Override
public
void decode(ChannelHandlerContext context, DatagramPacket in, List<Object> out) throws Exception {
try {
Connection_ last = (Connection_) context.pipeline()
.last();
ByteBuf data = in.content();
Object object = serializationManager.readWithCrypto(last, data, data.readableBytes());
out.add(object);
} catch (IOException e) {
String message = "Unable to deserialize object";
LoggerFactory.getLogger(this.getClass())
.error(message, e);
throw new IOException(message, e);
}
protected
Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception {
Connection_ connection = (Connection_) context.pipeline().last();
return serializationManager.readWithCrypto(connection, in, length);
}
}

View File

@ -44,6 +44,13 @@ class KryoEncoderUdp extends MessageToMessageEncoder<Object> {
this.serializationManager = serializationManager;
}
// the crypto writer will override this
protected
void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) throws IOException {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
serializationManager.write(buffer, msg);
}
@Override
protected
void encode(ChannelHandlerContext context, Object message, List<Object> out) throws Exception {
@ -70,18 +77,8 @@ class KryoEncoderUdp extends MessageToMessageEncoder<Object> {
.remoteAddress());
out.add(packet);
} catch (Exception e) {
String msg = "Unable to serialize object of type: " + message.getClass()
.getName();
LoggerFactory.getLogger(this.getClass())
.error(msg, e);
throw new IOException(msg, e);
context.fireExceptionCaught(new IOException("Unable to serialize object of type: " + message.getClass().getName(), e));
}
}
}
// the crypto writer will override this
void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) throws IOException {
// no connection here because we haven't created one yet. When we do, we replace this handler with a new one.
serializationManager.write(buffer, msg);
}
}

View File

@ -33,10 +33,9 @@ class KryoEncoderUdpCrypto extends KryoEncoderUdp {
}
@Override
protected
void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) throws IOException {
Connection_ last = (Connection_) ctx.pipeline()
.last();
Connection_ last = (Connection_) ctx.pipeline().last();
serializationManager.writeWithCrypto(last, buffer, msg);
}
}