Crypto and compression is cleaner, uses Java native instead of bouncycastle

This commit is contained in:
nathan 2019-06-13 20:36:46 +02:00
parent a52c67671d
commit 6177912bb4
22 changed files with 782 additions and 568 deletions

View File

@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.bouncycastle.crypto.params.ParametersWithIV; import javax.crypto.SecretKey;
import dorkbox.network.Client; import dorkbox.network.Client;
import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.bridge.ConnectionBridge;
@ -192,26 +192,24 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
} }
/** /**
* @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal * @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation).
* because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't
* clobber each other
*/ */
@Override @Override
public final public final
ParametersWithIV getCryptoParameters() { SecretKey cryptoKey() {
return this.channelWrapper.cryptoParameters(); return this.channelWrapper.cryptoKey();
} }
/** /**
* This is the per-message sequence number. * This is the per-message sequence number.
* *
* The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter)
* The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
* counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
*/ */
@Override @Override
public final public final
long getNextGcmSequence() { long nextGcmSequence() {
return aes_gcm_iv.getAndIncrement(); return aes_gcm_iv.getAndIncrement();
} }

View File

@ -15,7 +15,7 @@
*/ */
package dorkbox.network.connection; package dorkbox.network.connection;
import org.bouncycastle.crypto.params.ParametersWithIV; import javax.crypto.SecretKey;
import dorkbox.network.rmi.ConnectionRmiSupport; import dorkbox.network.rmi.ConnectionRmiSupport;
@ -33,19 +33,17 @@ interface Connection_ extends Connection {
/** /**
* This is the per-message sequence number. * This is the per-message sequence number.
* *
* The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter)
* The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this
* counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small)
*/ */
long getNextGcmSequence(); long nextGcmSequence();
/** /**
* @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal * @return the AES key.
* because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't
* clobber each other
*/ */
ParametersWithIV getCryptoParameters(); SecretKey cryptoKey();
/** /**
* @return the endpoint associated with this connection * @return the endpoint associated with this connection

View File

@ -16,23 +16,26 @@
package dorkbox.network.connection; package dorkbox.network.connection;
import java.io.IOException; import java.io.IOException;
import java.security.SecureRandom;
import org.bouncycastle.crypto.engines.AESFastEngine; import javax.crypto.Cipher;
import org.bouncycastle.crypto.modes.GCMBlockCipher; import javax.crypto.SecretKey;
import org.bouncycastle.crypto.params.ParametersWithIV; import javax.crypto.spec.GCMParameterSpec;
import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import dorkbox.network.pipeline.ByteBufInput; import dorkbox.network.pipeline.ByteBufInput;
import dorkbox.network.pipeline.ByteBufOutput; import dorkbox.network.pipeline.ByteBufOutput;
import dorkbox.network.rmi.ConnectionRmiSupport; import dorkbox.network.rmi.ConnectionRmiSupport;
import dorkbox.network.rmi.RmiNopConnection; import dorkbox.network.rmi.RmiNopConnection;
import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.util.bytes.BigEndian; import dorkbox.util.OS;
import dorkbox.util.bytes.OptimizeUtilsByteArray; import dorkbox.util.bytes.OptimizeUtilsByteArray;
import dorkbox.util.bytes.OptimizeUtilsByteBuf; import dorkbox.util.bytes.OptimizeUtilsByteBuf;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.ByteBufUtil;
import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor; import net.jpountz.lz4.LZ4FastDecompressor;
@ -43,6 +46,9 @@ import net.jpountz.lz4.LZ4FastDecompressor;
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
public public
class KryoExtra extends Kryo { class KryoExtra extends Kryo {
private static final int ABSOLUTE_MAX_SIZE_OBJECT = EndPoint.udpMaxSize * 1000; // by default, this is about 500k
private static final boolean DEBUG = false;
private static final Connection_ NOP_CONNECTION = new RmiNopConnection(); private static final Connection_ NOP_CONNECTION = new RmiNopConnection();
// snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
@ -52,39 +58,31 @@ class KryoExtra extends Kryo {
private static final LZ4Factory factory = LZ4Factory.fastestInstance(); private static final LZ4Factory factory = LZ4Factory.fastestInstance();
// for kryo serialization // for kryo serialization
private final ByteBufInput reader = new ByteBufInput(); private final ByteBufInput readerBuff = new ByteBufInput();
private final ByteBufOutput writer = new ByteBufOutput(); private final ByteBufOutput writerBuff = new ByteBufOutput();
// crypto + compression have to work with native byte arrays, so here we go...
private final Input reader = new Input(ABSOLUTE_MAX_SIZE_OBJECT);
private final Output writer = new Output(ABSOLUTE_MAX_SIZE_OBJECT);
private final byte[] temp = new byte[ABSOLUTE_MAX_SIZE_OBJECT];
// volatile to provide object visibility for entire class. This is unique per connection // volatile to provide object visibility for entire class. This is unique per connection
public volatile ConnectionRmiSupport rmiSupport; public volatile ConnectionRmiSupport rmiSupport;
private final GCMBlockCipher aesEngine = new GCMBlockCipher(new AESFastEngine()); private static final String ALGORITHM = "AES/GCM/NoPadding";
private static final int TAG_LENGTH_BIT = 128;
private static final int IV_LENGTH_BYTE = 12;
private final SecureRandom secureRandom = new SecureRandom();
private final Cipher cipher;
// writing data
private final ByteBuf tempBuffer = Unpooled.buffer(EndPoint.udpMaxSize);
private LZ4Compressor compressor = factory.fastCompressor(); private LZ4Compressor compressor = factory.fastCompressor();
private int inputArrayLength = -1;
private byte[] inputArray;
private int compressOutputLength = -1;
private byte[] compressOutput;
private int cryptoOutputLength = -1;
private byte[] cryptoOutput;
// reading data
private LZ4FastDecompressor decompressor = factory.fastDecompressor(); private LZ4FastDecompressor decompressor = factory.fastDecompressor();
private int decryptOutputLength = -1;
private byte[] decryptOutput;
private ByteBuf decryptBuf;
private int decompressOutputLength = -1;
private byte[] decompressOutput;
private ByteBuf decompressBuf;
private NetworkSerializationManager serializationManager; private NetworkSerializationManager serializationManager;
@ -93,499 +91,393 @@ class KryoExtra extends Kryo {
super(); super();
this.serializationManager = serializationManager; this.serializationManager = serializationManager;
try {
cipher = Cipher.getInstance(ALGORITHM);
} catch (Exception e) {
throw new IllegalStateException("could not get cipher instance", e);
}
} }
/**
* This is NOT ENCRYPTED
*/
public synchronized public synchronized
void write(final ByteBuf buffer, final Object message) throws IOException { void write(final ByteBuf buffer, final Object message) throws IOException {
// these will always be NULL during connection initialization write(NOP_CONNECTION, buffer, message);
this.rmiSupport = null; }
/**
* This is NOT ENCRYPTED
*/
public synchronized
void write(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.rmiSupport = connection.rmiSupport();
// write the object to the NORMAL output buffer! // write the object to the NORMAL output buffer!
writer.setBuffer(buffer); writerBuff.setBuffer(buffer);
writeClassAndObject(writerBuff, message);
}
/**
* This is NOT ENCRYPTED
*
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
public synchronized
Object read(final ByteBuf buffer) throws IOException {
return read(NOP_CONNECTION, buffer);
}
/**
* This is NOT ENCRYPTED
*
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*
*/
public synchronized
Object read(final Connection_ connection, final ByteBuf buffer) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.rmiSupport = connection.rmiSupport();
// read the object from the buffer.
readerBuff.setBuffer(buffer);
return readClassAndObject(readerBuff); // this properly sets the readerIndex, but only if it's the correct buffer
}
////////////////
////////////////
////////////////
// for more complicated writes, sadly, we have to deal DIRECTLY with byte arrays
////////////////
////////////////
////////////////
/**
* OUTPUT:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
private
void write(final Connection_ connection, final Output writer, final Object message) {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.rmiSupport = connection.rmiSupport();
// write the object to the NORMAL output buffer!
writer.reset();
writeClassAndObject(writer, message); writeClassAndObject(writer, message);
} }
public synchronized /**
Object read(final ByteBuf buffer) throws IOException { * INPUT:
// these will always be NULL during connection initialization * ++++++++++++++++++++++++++
this.rmiSupport = null; * + class and object bytes +
* ++++++++++++++++++++++++++
*/
private
Object read(final Connection_ connection, final Input reader) {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.rmiSupport = connection.rmiSupport();
//////////////// return readClassAndObject(reader);
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
////////////////
ByteBuf inputBuf = buffer;
// read the object from the buffer.
reader.setBuffer(inputBuf);
return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer
} }
/**
* This is NOT ENCRYPTED (and is only done on the loopback connection!)
*/
public synchronized public synchronized
void writeCompressed(final ByteBuf buffer, final Object message) throws IOException { void writeCompressed(final ByteBuf buffer, final Object message) throws IOException {
writeCompressed(NOP_CONNECTION, buffer, message); writeCompressed(NOP_CONNECTION, buffer, message);
} }
/** /**
* This is NOT ENCRYPTED (and is only done on the loopback connection!) * BUFFER:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*
* COMPRESSED DATA:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/ */
public synchronized public synchronized
void writeCompressed(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { void writeCompressed(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object // write the object to a TEMP buffer! this will be compressed later
this.rmiSupport = connection.rmiSupport(); write(connection, writer, message);
ByteBuf objectOutputBuffer = this.tempBuffer; // save off how much data the object took
objectOutputBuffer.clear(); // always have to reset everything int length = writer.position();
int maxCompressedLength = compressor.maxCompressedLength(length);
// write the object to a TEMP buffer! this will be compressed
writer.setBuffer(objectOutputBuffer);
writeClassAndObject(writer, message);
// save off how much data the object took + magic byte
int length = objectOutputBuffer.writerIndex();
// NOTE: compression and encryption MUST work with byte[] because they use JNI!
// Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use
// sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it...
// see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf
byte[] inputArray;
int inputOffset;
// Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because
// the buffer might be a slice of other buffer or a pooled buffer:
//noinspection Duplicates
if (objectOutputBuffer.hasArray() &&
objectOutputBuffer.array()[0] == objectOutputBuffer.getByte(0) &&
objectOutputBuffer.array().length == objectOutputBuffer.capacity()) {
// we can use it...
inputArray = objectOutputBuffer.array();
inputArrayLength = -1; // this is so we don't REUSE this array accidentally!
inputOffset = objectOutputBuffer.arrayOffset();
}
else {
// we can NOT use it.
if (length > inputArrayLength) {
inputArrayLength = length;
inputArray = new byte[length];
this.inputArray = inputArray;
}
else {
inputArray = this.inputArray;
}
objectOutputBuffer.getBytes(objectOutputBuffer.readerIndex(), inputArray, 0, length);
inputOffset = 0;
}
////////// compressing data ////////// compressing data
// we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// output), will be negated by the increase in size by the encryption // output), will be negated by the increase in size by the encryption
byte[] compressOutput = temp;
byte[] compressOutput = this.compressOutput; // LZ4 compress.
int compressedLength = compressor.compress(writer.getBuffer(), 0, length, compressOutput, 0, maxCompressedLength);
int maxLengthLengthOffset = 4; // length is never negative, so 4 is OK (5 means it's negative) if (DEBUG) {
int maxCompressedLength = compressor.maxCompressedLength(length); String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length);
String compressed = ByteBufUtil.hexDump(compressOutput, 0, compressedLength);
// add 4 so there is room to write the compressed size to the buffer System.err.println("ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
int maxCompressedLengthWithOffset = maxCompressedLength + maxLengthLengthOffset; OS.LINE_SEPARATOR +
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed);
// lazy initialize the compression output buffer
if (maxCompressedLengthWithOffset > compressOutputLength) {
compressOutputLength = maxCompressedLengthWithOffset;
compressOutput = new byte[maxCompressedLengthWithOffset];
this.compressOutput = compressOutput;
} }
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
// LZ4 compress. output offset max 4 bytes to leave room for length of tempOutput data OptimizeUtilsByteBuf.writeInt(buffer, length, true);
int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, maxLengthLengthOffset, maxCompressedLength);
// bytes can now be written to, because our compressed data is stored in a temp array.
final int lengthLength = OptimizeUtilsByteArray.intLength(length, true);
// correct input. compression output is now buffer input
inputArray = compressOutput;
inputOffset = maxLengthLengthOffset - lengthLength;
// now write the ORIGINAL (uncompressed) length to the front of the byte array (this is NOT THE BUFFER!). This is so we can use the FAST decompress version
OptimizeUtilsByteArray.writeInt(inputArray, length, true, inputOffset);
// have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
buffer.writeBytes(inputArray, inputOffset, compressedLength + lengthLength); buffer.writeBytes(compressOutput, 0, compressedLength);
} }
/**
* This is NOT ENCRYPTED (and is only done on the loopback connection!)
*/
public public
Object readCompressed(final ByteBuf buffer, int length) throws IOException { Object readCompressed(final ByteBuf buffer, int length) throws IOException {
return readCompressed(NOP_CONNECTION, buffer, length); return readCompressed(NOP_CONNECTION, buffer, length);
} }
/** /**
* This is NOT ENCRYPTED (and is only done on the loopback connection!) * BUFFER:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*
* COMPRESSED DATA:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/ */
public public
Object readCompressed(final Connection_ connection, final ByteBuf buffer, int length) throws IOException { Object readCompressed(final Connection_ connection, final ByteBuf buffer, int length) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.rmiSupport = connection.rmiSupport();
//////////////// ////////////////
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
//////////////// ////////////////
ByteBuf inputBuf = buffer;
// get the decompressed length (at the beginning of the array) // get the decompressed length (at the beginning of the array)
final int uncompressedLength = OptimizeUtilsByteBuf.readInt(buffer, true); final int uncompressedLength = OptimizeUtilsByteBuf.readInt(buffer, true);
final int lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-5 bytes for the decompressed size if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) {
throw new IOException("Uncompressed size (" + uncompressedLength + ") is larger than max allowed size (" + ABSOLUTE_MAX_SIZE_OBJECT + ")!");
}
// have to adjust for uncompressed length // because 1-4 bytes for the decompressed size (this number is never negative)
final int lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true);
int start = buffer.readerIndex();
// have to adjust for uncompressed length-length
length = length - lengthLength; length = length - lengthLength;
///////// decompress data -- as it's ALWAYS compressed ///////// decompress data
buffer.getBytes(start, temp, 0, length);
buffer.readerIndex(length);
// NOTE: compression and encryption MUST work with byte[] because they use JNI!
// Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use
// sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it...
// see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf
byte[] inputArray;
int inputOffset;
// Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because
// the buffer might be a slice of other buffer or a pooled buffer:
//noinspection Duplicates
if (inputBuf.hasArray() &&
inputBuf.array()[0] == inputBuf.getByte(0) &&
inputBuf.array().length == inputBuf.capacity()) {
// we can use it...
inputArray = inputBuf.array();
inputArrayLength = -1; // this is so we don't REUSE this array accidentally!
inputOffset = inputBuf.arrayOffset() + lengthLength;
}
else {
// we can NOT use it.
if (length > inputArrayLength) {
inputArrayLength = length;
inputArray = new byte[length];
this.inputArray = inputArray;
}
else {
inputArray = this.inputArray;
}
inputBuf.getBytes(inputBuf.readerIndex(), inputArray, 0, length);
inputOffset = 0;
}
// have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index.
buffer.readerIndex(buffer.readerIndex() + length);
///////// decompress data -- as it's ALWAYS compressed
byte[] decompressOutputArray = this.decompressOutput;
if (uncompressedLength > decompressOutputLength) {
decompressOutputLength = uncompressedLength;
decompressOutputArray = new byte[uncompressedLength];
this.decompressOutput = decompressOutputArray;
decompressBuf = Unpooled.wrappedBuffer(decompressOutputArray); // so we can read via kryo
}
inputBuf = decompressBuf;
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor) // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor)
decompressor.decompress(inputArray, inputOffset, decompressOutputArray, 0, uncompressedLength); reader.reset();
decompressor.decompress(temp, 0, reader.getBuffer(), 0, uncompressedLength);
inputBuf.setIndex(0, uncompressedLength); reader.setLimit(uncompressedLength);
if (DEBUG) {
String compressed = ByteBufUtil.hexDump(buffer, start, length);
String orig = ByteBufUtil.hexDump(reader.getBuffer(), start, uncompressedLength);
System.err.println("COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed +
OS.LINE_SEPARATOR +
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig);
}
// read the object from the buffer. // read the object from the buffer.
reader.setBuffer(inputBuf); Object classAndObject = read(connection, reader);
return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer if (DEBUG) {
System.err.println("Read compress object" + classAndObject.getClass().getSimpleName());
}
return classAndObject;
} }
/**
* BUFFER:
* +++++++++++++++++++++++++++++++
* + IV (12) + encrypted data +
* +++++++++++++++++++++++++++++++
*
* ENCRYPTED DATA:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*
* COMPRESSED DATA:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
public synchronized public synchronized
void writeCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { void writeCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object // write the object to a TEMP buffer! this will be compressed later
this.rmiSupport = connection.rmiSupport(); write(connection, writer, message);
ByteBuf objectOutputBuffer = this.tempBuffer;
objectOutputBuffer.clear(); // always have to reset everything
// write the object to a TEMP buffer! this will be compressed
writer.setBuffer(objectOutputBuffer);
writeClassAndObject(writer, message);
// save off how much data the object took // save off how much data the object took
int length = objectOutputBuffer.writerIndex(); int length = writer.position();
int maxCompressedLength = compressor.maxCompressedLength(length);
// NOTE: compression and encryption MUST work with byte[] because they use JNI!
// Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use
// sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it...
// see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf
byte[] inputArray;
int inputOffset;
// Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because
// the buffer might be a slice of other buffer or a pooled buffer:
//noinspection Duplicates
if (objectOutputBuffer.hasArray() &&
objectOutputBuffer.array()[0] == objectOutputBuffer.getByte(0) &&
objectOutputBuffer.array().length == objectOutputBuffer.capacity()) {
// we can use it...
inputArray = objectOutputBuffer.array();
inputArrayLength = -1; // this is so we don't REUSE this array accidentally!
inputOffset = objectOutputBuffer.arrayOffset();
}
else {
// we can NOT use it.
if (length > inputArrayLength) {
inputArrayLength = length;
inputArray = new byte[length];
this.inputArray = inputArray;
}
else {
inputArray = this.inputArray;
}
objectOutputBuffer.getBytes(objectOutputBuffer.readerIndex(), inputArray, 0, length);
inputOffset = 0;
}
////////// compressing data ////////// compressing data
// we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// output), will be negated by the increase in size by the encryption // output), will be negated by the increase in size by the encryption
byte[] compressOutput = temp;
byte[] compressOutput = this.compressOutput; // LZ4 compress. Offset by 4 in the dest array so we have room for the length
int compressedLength = compressor.compress(writer.getBuffer(), 0, length, compressOutput, 4, maxCompressedLength);
int maxLengthLengthOffset = 4; // length is never negative, so 4 is OK (5 means it's negative) if (DEBUG) {
int maxCompressedLength = compressor.maxCompressedLength(length); String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length);
String compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength);
// add 4 so there is room to write the compressed size to the buffer System.err.println("ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
int maxCompressedLengthWithOffset = maxCompressedLength + maxLengthLengthOffset; OS.LINE_SEPARATOR +
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed);
// lazy initialize the compression output buffer
if (maxCompressedLengthWithOffset > compressOutputLength) {
compressOutputLength = maxCompressedLengthWithOffset;
compressOutput = new byte[maxCompressedLengthWithOffset];
this.compressOutput = compressOutput;
} }
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
// LZ4 compress. output offset max 4 bytes to leave room for length of tempOutput data
int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, maxLengthLengthOffset, maxCompressedLength);
// bytes can now be written to, because our compressed data is stored in a temp array.
final int lengthLength = OptimizeUtilsByteArray.intLength(length, true); final int lengthLength = OptimizeUtilsByteArray.intLength(length, true);
// correct input. compression output is now encryption input // this is where we start writing the length data, so that the end of this lines up with the compressed data
inputArray = compressOutput; int start = 4 - lengthLength;
inputOffset = maxLengthLengthOffset - lengthLength;
OptimizeUtilsByteArray.writeInt(compressOutput, length, true, start);
// now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST decompress version // now compressOutput contains "uncompressed length + data"
OptimizeUtilsByteArray.writeInt(inputArray, length, true, inputOffset); int compressedArrayLength = lengthLength + compressedLength;
// correct length for encryption
length = compressedLength + lengthLength; // +1 to +4 for the uncompressed size bytes
/////// encrypting data. /////// encrypting data.
final long nextGcmSequence = connection.getNextGcmSequence(); final SecretKey cryptoKey = connection.cryptoKey();
// this is a threadlocal, so that we don't clobber other threads that are performing crypto on the same connection at the same time byte[] iv = new byte[IV_LENGTH_BYTE]; // NEVER REUSE THIS IV WITH SAME KEY
final ParametersWithIV cryptoParameters = connection.getCryptoParameters(); secureRandom.nextBytes(iv);
BigEndian.Long_.toBytes(nextGcmSequence, cryptoParameters.getIV(), 4); // put our counter into the IV
final GCMBlockCipher aes = this.aesEngine;
aes.reset();
aes.init(true, cryptoParameters);
byte[] cryptoOutput;
// lazy initialize the crypto output buffer
int cryptoSize = length + 16; // from: aes.getOutputSize(length);
// 'output' is the temp byte array
if (cryptoSize > cryptoOutputLength) {
cryptoOutputLength = cryptoSize;
cryptoOutput = new byte[cryptoSize];
this.cryptoOutput = cryptoOutput;
} else {
cryptoOutput = this.cryptoOutput;
}
int encryptedLength = aes.processBytes(inputArray, inputOffset, length, cryptoOutput, 0);
GCMParameterSpec parameterSpec = new GCMParameterSpec(TAG_LENGTH_BIT, iv); // 128 bit auth tag length
try { try {
// authentication tag for GCM cipher.init(Cipher.ENCRYPT_MODE, cryptoKey, parameterSpec);
encryptedLength += aes.doFinal(cryptoOutput, encryptedLength);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Unable to AES encrypt the data", e); throw new IOException("Unable to AES encrypt the data", e);
} }
// write out our GCM counter // we REUSE the writer buffer! (since that data is now compressed in a different array)
OptimizeUtilsByteBuf.writeLong(buffer, nextGcmSequence, true);
int encryptedLength;
try {
encryptedLength = cipher.doFinal(compressOutput, start, compressedArrayLength, writer.getBuffer(), 0);
} catch (Exception e) {
throw new IOException("Unable to AES encrypt the data", e);
}
// write out our IV
buffer.writeBytes(iv, 0, IV_LENGTH_BYTE);
// have to copy over the orig data, because we used the temp buffer // have to copy over the orig data, because we used the temp buffer
buffer.writeBytes(cryptoOutput, 0, encryptedLength); buffer.writeBytes(writer.getBuffer(), 0, encryptedLength);
if (DEBUG) {
String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE);
String crypto = ByteBufUtil.hexDump(writer.getBuffer(), 0, encryptedLength);
System.err.println("IV: (12)" + OS.LINE_SEPARATOR + ivString +
OS.LINE_SEPARATOR +
"CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto);
}
} }
/**
* BUFFER:
* +++++++++++++++++++++++++++++++
* + IV (12) + encrypted data +
* +++++++++++++++++++++++++++++++
*
* ENCRYPTED DATA:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*
* COMPRESSED DATA:
* ++++++++++++++++++++++++++
* + class and object bytes +
* ++++++++++++++++++++++++++
*/
public public
Object readCrypto(final Connection_ connection, final ByteBuf buffer, int length) throws IOException { Object readCrypto(final Connection_ connection, final ByteBuf buffer, int length) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object // read out the crypto IV
this.rmiSupport = connection.rmiSupport(); final byte[] iv = new byte[IV_LENGTH_BYTE];
buffer.readBytes(iv, 0 , IV_LENGTH_BYTE);
//////////////// // have to adjust for the IV
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! length = length - IV_LENGTH_BYTE;
////////////////
ByteBuf inputBuf = buffer; /////////// decrypt data
final SecretKey cryptoKey = connection.cryptoKey();
final long gcmIVCounter = OptimizeUtilsByteBuf.readLong(buffer, true);
int lengthLength = OptimizeUtilsByteArray.longLength(gcmIVCounter, true);
// have to adjust for the gcmIVCounter
length = length - lengthLength;
/////////// decrypting data
// NOTE: compression and encryption MUST work with byte[] because they use JNI!
// Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use
// sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it...
// see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf
byte[] inputArray;
int inputOffset;
// Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because
// the buffer might be a slice of other buffer or a pooled buffer:
//noinspection Duplicates
if (inputBuf.hasArray() &&
inputBuf.array()[0] == inputBuf.getByte(0) &&
inputBuf.array().length == inputBuf.capacity()) {
// we can use it...
inputArray = inputBuf.array();
inputArrayLength = -1; // this is so we don't REUSE this array accidentally!
inputOffset = inputBuf.arrayOffset() + lengthLength;
}
else {
// we can NOT use it.
if (length > inputArrayLength) {
inputArrayLength = length;
inputArray = new byte[length];
this.inputArray = inputArray;
}
else {
inputArray = this.inputArray;
}
inputBuf.getBytes(inputBuf.readerIndex(), inputArray, 0, length);
inputOffset = 0;
}
// have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index.
buffer.readerIndex(buffer.readerIndex() + length);
// this is a threadlocal, so that we don't clobber other threads that are performing crypto on the same connection at the same time
final ParametersWithIV cryptoParameters = connection.getCryptoParameters();
BigEndian.Long_.toBytes(gcmIVCounter, cryptoParameters.getIV(), 4); // put our counter into the IV
final GCMBlockCipher aes = this.aesEngine;
aes.reset();
aes.init(false, cryptoParameters);
int cryptoSize = length - 16; // from: aes.getOutputSize(length);
// lazy initialize the decrypt output buffer
byte[] decryptOutputArray;
if (cryptoSize > decryptOutputLength) {
decryptOutputLength = cryptoSize;
decryptOutputArray = new byte[cryptoSize];
this.decryptOutput = decryptOutputArray;
decryptBuf = Unpooled.wrappedBuffer(decryptOutputArray);
} else {
decryptOutputArray = this.decryptOutput;
}
int decryptedLength = aes.processBytes(inputArray, inputOffset, length, decryptOutputArray, 0);
try { try {
// authentication tag for GCM cipher.init(Cipher.DECRYPT_MODE, cryptoKey, new GCMParameterSpec(TAG_LENGTH_BIT, iv));
decryptedLength += aes.doFinal(decryptOutputArray, decryptedLength);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Unable to AES decrypt the data", e); throw new IOException("Unable to AES decrypt the data", e);
} }
// have to copy out bytes, we reuse the reader byte array!
buffer.readBytes(reader.getBuffer(), 0, length);
if (DEBUG) {
String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE);
String crypto = ByteBufUtil.hexDump(reader.getBuffer(), 0, length);
System.err.println("IV: (12)" + OS.LINE_SEPARATOR + ivString +
OS.LINE_SEPARATOR + "CRYPTO: (" + length + ")" + OS.LINE_SEPARATOR + crypto);
}
int decryptedLength;
try {
decryptedLength = cipher.doFinal(reader.getBuffer(),0, length, temp, 0);
} catch (Exception e) {
throw new IOException("Unable to AES decrypt the data", e);
}
///////// decompress data -- as it's ALWAYS compressed ///////// decompress data -- as it's ALWAYS compressed
// get the decompressed length (at the beginning of the array) // get the decompressed length (at the beginning of the array)
inputArray = decryptOutputArray; final int uncompressedLength = OptimizeUtilsByteArray.readInt(temp, true);
final int uncompressedLength = OptimizeUtilsByteArray.readInt(inputArray, true);
inputOffset = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-4 bytes for the decompressed size
// where does our data start, AFTER the length field
byte[] decompressOutputArray = this.decompressOutput; int start = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-4 bytes for the uncompressed size;
if (uncompressedLength > decompressOutputLength) {
decompressOutputLength = uncompressedLength;
decompressOutputArray = new byte[uncompressedLength];
this.decompressOutput = decompressOutputArray;
decompressBuf = Unpooled.wrappedBuffer(decompressOutputArray); // so we can read via kryo
}
inputBuf = decompressBuf;
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor
decompressor.decompress(inputArray, inputOffset, decompressOutputArray, 0, uncompressedLength); reader.reset();
decompressor.decompress(temp, start, reader.getBuffer(), 0, uncompressedLength);
reader.setLimit(uncompressedLength);
inputBuf.setIndex(0, uncompressedLength); if (DEBUG) {
int endWithoutUncompressedLength = decryptedLength - start;
String compressed = ByteBufUtil.hexDump(temp, start, endWithoutUncompressedLength);
String orig = ByteBufUtil.hexDump(reader.getBuffer(), 0, uncompressedLength);
System.err.println("COMPRESSED: (" + endWithoutUncompressedLength + ")" + OS.LINE_SEPARATOR + compressed +
OS.LINE_SEPARATOR +
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig);
}
// read the object from the buffer. // read the object from the buffer.
reader.setBuffer(inputBuf); return read(connection, reader);
return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer
} }
@Override @Override
protected protected
void finalize() throws Throwable { void finalize() throws Throwable {
if (decompressBuf != null) { readerBuff.getByteBuf().release();
decompressBuf.release(); writerBuff.getByteBuf().release();
}
if (decryptBuf != null) {
decryptBuf.release();
}
super.finalize(); super.finalize();
} }
@ -594,5 +486,4 @@ class KryoExtra extends Kryo {
NetworkSerializationManager getSerializationManager() { NetworkSerializationManager getSerializationManager() {
return serializationManager; return serializationManager;
} }
} }

View File

@ -15,7 +15,7 @@
*/ */
package dorkbox.network.connection.registration; package dorkbox.network.connection.registration;
import org.bouncycastle.crypto.params.ParametersWithIV; import javax.crypto.SecretKey;
import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.ConnectionPoint;
@ -61,14 +61,14 @@ class ConnectionRegistrationImpl implements Connection_, ChannelHandler {
@Override @Override
public public
long getNextGcmSequence() { long nextGcmSequence() {
return connection.getNextGcmSequence(); return connection.nextGcmSequence();
} }
@Override @Override
public public
ParametersWithIV getCryptoParameters() { SecretKey cryptoKey() {
return connection.getCryptoParameters(); return connection.cryptoKey();
} }
@Override @Override

View File

@ -17,6 +17,8 @@ package dorkbox.network.connection.registration;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair; import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.params.ECPublicKeyParameters; import org.bouncycastle.crypto.params.ECPublicKeyParameters;
@ -44,9 +46,7 @@ class MetaChannel {
public volatile ECPublicKeyParameters publicKey; // used for ECC crypto + handshake on NETWORK (remote) connections. This is the remote public key. public volatile ECPublicKeyParameters publicKey; // used for ECC crypto + handshake on NETWORK (remote) connections. This is the remote public key.
public volatile AsymmetricCipherKeyPair ecdhKey; // used for ECC Diffie-Hellman-Merkle key exchanges: see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange public volatile AsymmetricCipherKeyPair ecdhKey; // used for ECC Diffie-Hellman-Merkle key exchanges: see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange
// since we are using AES-GCM, the aesIV here **MUST** be exactly 12 bytes public volatile SecretKey secretKey;
public volatile byte[] aesKey;
public volatile byte[] aesIV;
// indicates if the remote ECC key has changed for an IP address. If the client detects this, it will not connect. // indicates if the remote ECC key has changed for an IP address. If the client detects this, it will not connect.
// If the server detects this, it has the option for additional security (two-factor auth, perhaps?) // If the server detects this, it has the option for additional security (two-factor auth, perhaps?)

View File

@ -200,25 +200,6 @@ class RegistrationRemoteHandler extends RegistrationHandler {
protected abstract protected abstract
String getConnectionDirection(); String getConnectionDirection();
/**
* @return true if validation was successful
*/
final
boolean invalidAES(final MetaChannel metaChannel) {
if (metaChannel.aesKey.length != 32) {
logger.error("Fatal error trying to use AES key (wrong key length).");
return true;
}
// IV length must == 12 because we are using GCM!
else if (metaChannel.aesIV.length != 12) {
logger.error("Fatal error trying to use AES IV (wrong IV length).");
return true;
}
return false;
}
/** /**
* upgrades a channel ONE channel at a time * upgrades a channel ONE channel at a time
*/ */

View File

@ -21,6 +21,8 @@ import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import javax.crypto.spec.SecretKeySpec;
import org.bouncycastle.crypto.BasicAgreement; import org.bouncycastle.crypto.BasicAgreement;
import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement; import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement;
import org.bouncycastle.crypto.digests.SHA384Digest; import org.bouncycastle.crypto.digests.SHA384Digest;
@ -99,7 +101,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
// IN: session ID + public key + ecc parameters (which are a nonce. the SERVER defines what these are) // IN: session ID + public key + ecc parameters (which are a nonce. the SERVER defines what these are)
// OUT: remote ECDH shared payload // OUT: remote ECDH shared payload
if (metaChannel.aesKey == null && registration.publicKey != null) { if (metaChannel.secretKey == null && registration.publicKey != null) {
// whoa! Didn't send valid public key info! // whoa! Didn't send valid public key info!
if (invalidPublicKey(registration, type)) { if (invalidPublicKey(registration, type)) {
shutdown(channel, registration.sessionID); shutdown(channel, registration.sessionID);
@ -133,7 +135,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
// IN: remote ECDH shared payload // IN: remote ECDH shared payload
// OUT: hasMore=true if we have more registrations to do, false otherwise // OUT: hasMore=true if we have more registrations to do, false otherwise
if (metaChannel.aesKey == null) { if (metaChannel.secretKey == null) {
/* /*
* Diffie-Hellman-Merkle key exchange for the AES key * Diffie-Hellman-Merkle key exchange for the AES key
* http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange * http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange
@ -161,14 +163,8 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler {
sha384.update(keySeed, 0, keySeed.length); sha384.update(keySeed, 0, keySeed.length);
sha384.doFinal(digest, 0); sha384.doFinal(digest, 0);
metaChannel.aesKey = Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes) byte[] key = org.bouncycastle.util.Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes)
metaChannel.aesIV = Arrays.copyOfRange(digest, 32, 44); // 96bit blocksize (12 bytes) required by AES-GCM metaChannel.secretKey = new SecretKeySpec(key, "AES");
if (invalidAES(metaChannel)) {
// abort if something messed up!
shutdown(channel, registration.sessionID);
return;
}
Registration outboundRegister = new Registration(metaChannel.sessionId); Registration outboundRegister = new Registration(metaChannel.sessionId);

View File

@ -20,6 +20,8 @@ import java.net.InetSocketAddress;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.crypto.spec.SecretKeySpec;
import org.bouncycastle.crypto.AsymmetricCipherKeyPair; import org.bouncycastle.crypto.AsymmetricCipherKeyPair;
import org.bouncycastle.crypto.BasicAgreement; import org.bouncycastle.crypto.BasicAgreement;
import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement; import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement;
@ -123,7 +125,7 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
// IN: remote ECDH shared payload // IN: remote ECDH shared payload
// OUT: server ECDH shared payload // OUT: server ECDH shared payload
if (metaChannel.aesKey == null) { if (metaChannel.secretKey == null) {
/* /*
* Diffie-Hellman-Merkle key exchange for the AES key * Diffie-Hellman-Merkle key exchange for the AES key
* http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange * http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange
@ -157,14 +159,8 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler {
sha384.update(keySeed, 0, keySeed.length); sha384.update(keySeed, 0, keySeed.length);
sha384.doFinal(digest, 0); sha384.doFinal(digest, 0);
metaChannel.aesKey = Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes) byte[] key = Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes)
metaChannel.aesIV = Arrays.copyOfRange(digest, 32, 44); // 96bit blocksize (12 bytes) required by AES-GCM metaChannel.secretKey = new SecretKeySpec(key, "AES");
if (invalidAES(metaChannel)) {
// abort if something messed up!
shutdown(channel, registration.sessionID);
return;
}
Registration outboundRegister = new Registration(metaChannel.sessionId); Registration outboundRegister = new Registration(metaChannel.sessionId);

View File

@ -17,7 +17,8 @@ package dorkbox.network.connection.wrapper;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.bouncycastle.crypto.params.ParametersWithIV; import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.ConnectionPoint;
@ -30,12 +31,14 @@ import io.netty.util.concurrent.Promise;
public public
class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint { class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint {
private static final SecretKey dummyCryptoKey = new SecretKeySpec(new byte[32], "AES");
private final Channel channel; private final Channel channel;
private final AtomicBoolean shouldFlush = new AtomicBoolean(false); private final AtomicBoolean shouldFlush = new AtomicBoolean(false);
private String remoteAddress; private String remoteAddress;
public public
ChannelLocalWrapper(MetaChannel metaChannel) { ChannelLocalWrapper(MetaChannel metaChannel) {
this.channel = metaChannel.localChannel; this.channel = metaChannel.localChannel;
@ -95,8 +98,8 @@ class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint {
@Override @Override
public public
ParametersWithIV cryptoParameters() { SecretKey cryptoKey() {
return null; return dummyCryptoKey;
} }
@Override @Override

View File

@ -17,15 +17,13 @@ package dorkbox.network.connection.wrapper;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.bouncycastle.crypto.params.KeyParameter; import javax.crypto.SecretKey;
import org.bouncycastle.crypto.params.ParametersWithIV;
import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.ConnectionPoint;
import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ISessionManager; import dorkbox.network.connection.ISessionManager;
import dorkbox.network.connection.registration.MetaChannel; import dorkbox.network.connection.registration.MetaChannel;
import dorkbox.util.FastThreadLocal;
import io.netty.bootstrap.DatagramCloseMessage; import io.netty.bootstrap.DatagramCloseMessage;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
@ -43,11 +41,7 @@ class ChannelNetworkWrapper implements ChannelWrapper {
private final String remoteAddress; private final String remoteAddress;
private final boolean isLoopback; private final boolean isLoopback;
// GCM IV. hacky way to prevent tons of GC and to not clobber the original parameters private final SecretKey secretKey;
private final byte[] aesKey; // AES-256 requires 32 bytes
private final byte[] aesIV; // AES-GCM requires 12 bytes
private final FastThreadLocal<ParametersWithIV> cryptoParameters;
public public
ChannelNetworkWrapper(final MetaChannel metaChannel, final InetSocketAddress remoteAddress) { ChannelNetworkWrapper(final MetaChannel metaChannel, final InetSocketAddress remoteAddress) {
@ -72,17 +66,8 @@ class ChannelNetworkWrapper implements ChannelWrapper {
this.remoteAddress = remoteAddress.getAddress().getHostAddress(); this.remoteAddress = remoteAddress.getAddress().getHostAddress();
this.remotePublicKeyChanged = metaChannel.changedRemoteKey; this.remotePublicKeyChanged = metaChannel.changedRemoteKey;
// AES key & IV (only for networked connections) // AES key (only for networked connections)
aesKey = metaChannel.aesKey; secretKey = metaChannel.secretKey;
aesIV = metaChannel.aesIV;
cryptoParameters = new FastThreadLocal<ParametersWithIV>() {
@Override
public
ParametersWithIV initialValue() {
return new ParametersWithIV(new KeyParameter(aesKey), aesIV);
}
};
} }
public final public final
@ -118,14 +103,12 @@ class ChannelNetworkWrapper implements ChannelWrapper {
} }
/** /**
* @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal * @return the AES key.
* because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't
* clobber each other
*/ */
@Override @Override
public public
ParametersWithIV cryptoParameters() { SecretKey cryptoKey() {
return this.cryptoParameters.get(); return this.secretKey;
} }
@Override @Override

View File

@ -15,7 +15,7 @@
*/ */
package dorkbox.network.connection.wrapper; package dorkbox.network.connection.wrapper;
import org.bouncycastle.crypto.params.ParametersWithIV; import javax.crypto.SecretKey;
import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.ConnectionPoint;
@ -33,11 +33,9 @@ interface ChannelWrapper {
void flush(); void flush();
/** /**
* @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal * @return the AES key.
* because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't
* clobber each other
*/ */
ParametersWithIV cryptoParameters(); SecretKey cryptoKey();
/** /**
* @return true if this connection is connection on the loopback interface. This is specifically used to dynamically enable/disable * @return true if this connection is connection on the loopback interface. This is specifically used to dynamically enable/disable

View File

@ -22,27 +22,20 @@ import io.netty.channel.ChannelHandlerContext;
// on client this is MessageToMessage (because of the UdpDecoder in the pipeline!) // on client this is MessageToMessage (because of the UdpDecoder in the pipeline!)
public public
class KryoDecoderCrypto extends KryoDecoder { class KryoDecoderTcpCompression extends KryoDecoderTcp {
public public
KryoDecoderCrypto(final NetworkSerializationManager serializationManager) { KryoDecoderTcpCompression(final NetworkSerializationManager serializationManager) {
super(serializationManager); super(serializationManager);
} }
@Override @Override
protected protected
Object readObject(final NetworkSerializationManager serializationManager, Object readObject(final NetworkSerializationManager serializationManager,
final ChannelHandlerContext context, final ChannelHandlerContext context, final ByteBuf in, final int length) throws Exception {
final ByteBuf in, Connection_ connection = (Connection_) context.pipeline().last();
final int length) throws Exception { return serializationManager.readWithCompression(connection, in, length);
try {
Connection_ connection = (Connection_) context.pipeline()
.last();
return serializationManager.readWithCrypto(connection, in, length);
} catch (Exception e) {
throw e;
}
} }
} }

View File

@ -0,0 +1,41 @@
/*
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.pipeline.tcp;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
// on client this is MessageToMessage (because of the UdpDecoder in the pipeline!)
public
class KryoDecoderTcpNone extends KryoDecoderTcp {
public
KryoDecoderTcpNone(final NetworkSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
Object readObject(final NetworkSerializationManager serializationManager,
final ChannelHandlerContext context, final ByteBuf in, final int length) throws Exception {
Connection_ connection = (Connection_) context.pipeline().last();
return serializationManager.read(connection, in, length);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.pipeline.tcp;
import java.io.IOException;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public
class KryoEncoderTcpCompression extends KryoEncoderTcp {
public
KryoEncoderTcpCompression(final NetworkSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
void writeObject(final NetworkSerializationManager serializationManager,
final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException {
Connection_ connection = (Connection_) context.pipeline().last();
serializationManager.writeWithCompression(connection, buffer, msg);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.pipeline.tcp;
import java.io.IOException;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public
class KryoEncoderTcpNone extends KryoEncoderTcp {
public
KryoEncoderTcpNone(final NetworkSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
void writeObject(final NetworkSerializationManager serializationManager,
final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException {
Connection_ connection = (Connection_) context.pipeline().last();
serializationManager.write(connection, buffer, msg);
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.pipeline.udp;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public
class KryoDecoderUdpCompression extends KryoDecoderUdp {
public
KryoDecoderUdpCompression(NetworkSerializationManager serializationManager) {
super(serializationManager);
}
protected
Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception {
Connection_ connection = (Connection_) context.pipeline().last();
return serializationManager.readWithCompression(connection, in, length);
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.pipeline.udp;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public
class KryoDecoderUdpNone extends KryoDecoderUdp {
public
KryoDecoderUdpNone(NetworkSerializationManager serializationManager) {
super(serializationManager);
}
protected
Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception {
Connection_ connection = (Connection_) context.pipeline().last();
return serializationManager.read(connection, in, length);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.pipeline.udp;
import java.io.IOException;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public
class KryoEncoderUdpCompression extends KryoEncoderUdp {
public
KryoEncoderUdpCompression(NetworkSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) throws IOException {
Connection_ connection = (Connection_) ctx.pipeline().last();
serializationManager.writeWithCompression(connection, buffer, msg);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.pipeline.udp;
import java.io.IOException;
import dorkbox.network.connection.Connection_;
import dorkbox.network.serialization.NetworkSerializationManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public
class KryoEncoderUdpNone extends KryoEncoderUdp {
public
KryoEncoderUdpNone(NetworkSerializationManager serializationManager) {
super(serializationManager);
}
@Override
protected
void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) throws IOException {
Connection_ connection = (Connection_) ctx.pipeline().last();
serializationManager.write(connection, buffer, msg);
}
}

View File

@ -14,7 +14,7 @@
*/ */
package dorkbox.network.rmi; package dorkbox.network.rmi;
import org.bouncycastle.crypto.params.ParametersWithIV; import javax.crypto.SecretKey;
import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.ConnectionPoint;
import dorkbox.network.connection.Connection_; import dorkbox.network.connection.Connection_;
@ -130,13 +130,13 @@ class RmiNopConnection implements Connection_ {
@Override @Override
public public
long getNextGcmSequence() { long nextGcmSequence() {
return 0; return 0;
} }
@Override @Override
public public
ParametersWithIV getCryptoParameters() { SecretKey cryptoKey() {
return null; return null;
} }
} }

View File

@ -30,6 +30,34 @@ import io.netty.buffer.ByteBuf;
public public
interface NetworkSerializationManager extends SerializationManager { interface NetworkSerializationManager extends SerializationManager {
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
void write(Connection_ connection, ByteBuf buffer, Object message) throws IOException;
/**
* Reads an object from the buffer.
*
* @param length should ALWAYS be the length of the expected object!
*/
Object read(Connection_ connection, ByteBuf buffer, int length) throws IOException;
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
void writeWithCompression(Connection_ connection, ByteBuf buffer, Object message) throws IOException;
/**
* Reads an object from the buffer.
*
* @param length should ALWAYS be the length of the expected object!
*/
Object readWithCompression(Connection_ connection, ByteBuf buffer, int length) throws IOException;
/** /**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/> * <p/>
@ -39,13 +67,8 @@ interface NetworkSerializationManager extends SerializationManager {
/** /**
* Reads an object from the buffer. * Reads an object from the buffer.
* <p/>
* Crypto + sequence number
* *
* @param connection * @param length should ALWAYS be the length of the expected object!
* can be NULL
* @param length
* should ALWAYS be the length of the expected object!
*/ */
Object readWithCrypto(Connection_ connection, ByteBuf buffer, int length) throws IOException; Object readWithCrypto(Connection_ connection, ByteBuf buffer, int length) throws IOException;

View File

@ -775,38 +775,23 @@ class Serialization implements NetworkSerializationManager {
/** /**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p> * <p/>
* There is a small speed penalty if there were no kryo's available to use. * There is a small speed penalty if there were no kryo's available to use.
*/ */
@Override @Override
public final public final
void writeWithCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { void writeWithCompression(Connection_ connection, ByteBuf buffer, Object message) throws IOException {
final KryoExtra kryo = kryoPool.take(); final KryoExtra kryo = kryoPool.take();
try { try {
// we only need to encrypt when NOT on loopback, since encrypting on loopback is a waste of CPU if (wireWriteLogger.isTraceEnabled()) {
if (connection.isLoopback()) { int start = buffer.writerIndex();
if (wireWriteLogger.isTraceEnabled()) { kryo.writeCompressed(connection, buffer, message);
int start = buffer.writerIndex(); int end = buffer.writerIndex();
kryo.writeCompressed(connection, buffer, message);
int end = buffer.writerIndex();
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
}
else {
kryo.writeCompressed(connection, buffer, message);
}
} }
else { else {
if (wireWriteLogger.isTraceEnabled()) { kryo.writeCompressed(connection, buffer, message);
int start = buffer.writerIndex();
kryo.writeCrypto(connection, buffer, message);
int end = buffer.writerIndex();
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
}
else {
kryo.writeCrypto(connection, buffer, message);
}
} }
} finally { } finally {
kryoPool.put(kryo); kryoPool.put(kryo);
@ -815,8 +800,111 @@ class Serialization implements NetworkSerializationManager {
/** /**
* Reads an object from the buffer. * Reads an object from the buffer.
*
* @param length should ALWAYS be the length of the expected object!
*/
@Override
public final
Object read(Connection_ connection, ByteBuf buffer, int length) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
if (wireReadLogger.isTraceEnabled()) {
int start = buffer.readerIndex();
Object object = kryo.read(connection, buffer);
int end = buffer.readerIndex();
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
return object;
}
else {
return kryo.read(connection, buffer);
}
} finally {
kryoPool.put(kryo);
}
}
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p/>
* There is a small speed penalty if there were no kryo's available to use.
*/
@Override
public final
void write(Connection_ connection, ByteBuf buffer, Object message) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
if (wireWriteLogger.isTraceEnabled()) {
int start = buffer.writerIndex();
kryo.write(connection, buffer, message);
int end = buffer.writerIndex();
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
}
else {
kryo.write(connection, buffer, message);
}
} finally {
kryoPool.put(kryo);
}
}
/**
* Reads an object from the buffer.
*
* @param connection can be NULL
* @param length should ALWAYS be the length of the expected object!
*/
@Override
public final
Object readWithCompression(Connection_ connection, ByteBuf buffer, int length) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
if (wireReadLogger.isTraceEnabled()) {
int start = buffer.readerIndex();
Object object = kryo.readCompressed(connection, buffer, length);
int end = buffer.readerIndex();
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
return object;
}
else {
return kryo.readCompressed(connection, buffer, length);
}
} finally {
kryoPool.put(kryo);
}
}
/**
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
* <p> * <p>
* Crypto + sequence number * There is a small speed penalty if there were no kryo's available to use.
*/
@Override
public final
void writeWithCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
final KryoExtra kryo = kryoPool.take();
try {
if (wireWriteLogger.isTraceEnabled()) {
int start = buffer.writerIndex();
kryo.writeCrypto(connection, buffer, message);
int end = buffer.writerIndex();
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
}
else {
kryo.writeCrypto(connection, buffer, message);
}
} finally {
kryoPool.put(kryo);
}
}
/**
* Reads an object from the buffer.
* *
* @param connection can be NULL * @param connection can be NULL
* @param length should ALWAYS be the length of the expected object! * @param length should ALWAYS be the length of the expected object!
@ -827,34 +915,17 @@ class Serialization implements NetworkSerializationManager {
Object readWithCrypto(final Connection_ connection, final ByteBuf buffer, final int length) throws IOException { Object readWithCrypto(final Connection_ connection, final ByteBuf buffer, final int length) throws IOException {
final KryoExtra kryo = kryoPool.take(); final KryoExtra kryo = kryoPool.take();
try { try {
// we only need to encrypt when NOT on loopback, since encrypting on loopback is a waste of CPU if (wireReadLogger.isTraceEnabled()) {
if (connection.isLoopback()) { int start = buffer.readerIndex();
if (wireReadLogger.isTraceEnabled()) { Object object = kryo.readCrypto(connection, buffer, length);
int start = buffer.readerIndex(); int end = buffer.readerIndex();
Object object = kryo.readCompressed(connection, buffer, length);
int end = buffer.readerIndex();
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
return object; return object;
}
else {
return kryo.readCompressed(connection, buffer, length);
}
} }
else { else {
if (wireReadLogger.isTraceEnabled()) { return kryo.readCrypto(connection, buffer, length);
int start = buffer.readerIndex();
Object object = kryo.readCrypto(connection, buffer, length);
int end = buffer.readerIndex();
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
return object;
}
else {
return kryo.readCrypto(connection, buffer, length);
}
} }
} finally { } finally {
kryoPool.put(kryo); kryoPool.put(kryo);