Added writeCompressed/readCompressed methods.

This commit is contained in:
nathan 2016-03-18 16:04:27 +01:00
parent 739315afb4
commit 9ffc8655bc

View File

@ -123,6 +123,186 @@ class KryoExtra extends Kryo {
return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer
}
public synchronized
void writeCompressed(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.connection = connection;
ByteBuf objectOutputBuffer = this.tempBuffer;
objectOutputBuffer.clear(); // always have to reset everything
// write the object to a TEMP buffer! this will be compressed
writer.setBuffer(objectOutputBuffer);
writeClassAndObject(writer, message);
// save off how much data the object took + magic byte
int length = objectOutputBuffer.writerIndex();
// NOTE: compression and encryption MUST work with byte[] because they use JNI!
// Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use
// sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it...
// see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf
byte[] inputArray;
int inputOffset;
// Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because
// the buffer might be a slice of other buffer or a pooled buffer:
//noinspection Duplicates
if (objectOutputBuffer.hasArray() &&
objectOutputBuffer.array()[0] == objectOutputBuffer.getByte(0) &&
objectOutputBuffer.array().length == objectOutputBuffer.capacity()) {
// we can use it...
inputArray = objectOutputBuffer.array();
inputArrayLength = -1; // this is so we don't REUSE this array accidentally!
inputOffset = objectOutputBuffer.arrayOffset();
}
else {
// we can NOT use it.
if (length > inputArrayLength) {
inputArrayLength = length;
inputArray = new byte[length];
this.inputArray = inputArray;
}
else {
inputArray = this.inputArray;
}
objectOutputBuffer.getBytes(objectOutputBuffer.readerIndex(), inputArray, 0, length);
inputOffset = 0;
}
////////// compressing data
// we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// output), will be negated by the increase in size by the encryption
byte[] compressOutput = this.compressOutput;
int maxLengthLengthOffset = 5;
int maxCompressedLength = compressor.maxCompressedLength(length);
// add 5 so there is room to write the compressed size to the buffer
int maxCompressedLengthWithOffset = maxCompressedLength + maxLengthLengthOffset;
// lazy initialize the compression output buffer
if (maxCompressedLengthWithOffset > compressOutputLength) {
compressOutputLength = maxCompressedLengthWithOffset;
compressOutput = new byte[maxCompressedLengthWithOffset];
this.compressOutput = compressOutput;
}
// LZ4 compress. output offset max 5 bytes to leave room for length of tempOutput data
int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, maxLengthLengthOffset, maxCompressedLength);
// bytes can now be written to, because our compressed data is stored in a temp array.
final int lengthLength = OptimizeUtilsByteArray.intLength(length, true);
// correct input. compression output is now buffer input
inputArray = compressOutput;
inputOffset = maxLengthLengthOffset - lengthLength;
// now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST decompress version
OptimizeUtilsByteArray.writeInt(inputArray, length, true, inputOffset);
// write out the "magic" byte.
buffer.writeByte(crypto);
// have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
buffer.writeBytes(inputArray, inputOffset, compressedLength + lengthLength);
}
public
Object readCompressed(final ConnectionImpl connection, final ByteBuf buffer, int length) throws IOException {
// required by RMI and some serializers to determine which connection wrote (or has info about) this object
this.connection = connection;
////////////////
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
////////////////
ByteBuf inputBuf = buffer;
// read off the magic byte
final byte magicByte = buffer.readByte();
// get the decompressed length (at the beginning of the array)
final int uncompressedLength = OptimizeUtilsByteBuf.readInt(buffer, true);
final int lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-5 bytes for the decompressed size
// have to adjust for the magic byte and uncompressed length
length = length - 1 - lengthLength;
///////// decompress data -- as it's ALWAYS compressed
// NOTE: compression and encryption MUST work with byte[] because they use JNI!
// Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use
// sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it...
// see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf
byte[] inputArray;
int inputOffset;
// Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because
// the buffer might be a slice of other buffer or a pooled buffer:
//noinspection Duplicates
if (inputBuf.hasArray() &&
inputBuf.array()[0] == inputBuf.getByte(0) &&
inputBuf.array().length == inputBuf.capacity()) {
// we can use it...
inputArray = inputBuf.array();
inputArrayLength = -1; // this is so we don't REUSE this array accidentally!
inputOffset = inputBuf.arrayOffset();
}
else {
// we can NOT use it.
if (length > inputArrayLength) {
inputArrayLength = length;
inputArray = new byte[length];
this.inputArray = inputArray;
}
else {
inputArray = this.inputArray;
}
inputBuf.getBytes(inputBuf.readerIndex(), inputArray, 0, length);
inputOffset = 0;
}
// have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index.
buffer.readerIndex(buffer.readerIndex() + length);
///////// decompress data -- as it's ALWAYS compressed
byte[] decompressOutputArray = this.decompressOutput;
if (uncompressedLength > decompressOutputLength) {
decompressOutputLength = uncompressedLength;
decompressOutputArray = new byte[uncompressedLength];
this.decompressOutput = decompressOutputArray;
decompressBuf = Unpooled.wrappedBuffer(decompressOutputArray); // so we can read via kryo
}
inputBuf = decompressBuf;
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor
decompressor.decompress(inputArray, inputOffset, decompressOutputArray, 0, uncompressedLength);
inputBuf.setIndex(0, uncompressedLength);
// read the object from the buffer.
reader.setBuffer(inputBuf);
return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer
}
public synchronized
void writeCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
@ -137,7 +317,7 @@ class KryoExtra extends Kryo {
writeClassAndObject(writer, message);
// save off how much data the object took + magic byte
// save off how much data the object took
int length = objectOutputBuffer.writerIndex();
@ -258,8 +438,6 @@ class KryoExtra extends Kryo {
// write out our GCM counter
OptimizeUtilsByteBuf.writeLong(buffer, nextGcmSequence, true);
// System.err.println("out " + gcmIVCounter);
// have to copy over the orig data, because we used the temp buffer
buffer.writeBytes(cryptoOutput, 0, encryptedLength);
}
@ -280,7 +458,6 @@ class KryoExtra extends Kryo {
final byte magicByte = buffer.readByte();
final long gcmIVCounter = OptimizeUtilsByteBuf.readLong(buffer, true);
// System.err.println("in " + gcmIVCounter);
// compression can ONLY happen if it's ALSO crypto'd