From 9ffc8655bcb70c84fefd114027488285ee0b54a4 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 18 Mar 2016 16:04:27 +0100 Subject: [PATCH] Added writeCompressed/readCompressed methods. --- src/dorkbox/network/connection/KryoExtra.java | 185 +++++++++++++++++- 1 file changed, 181 insertions(+), 4 deletions(-) diff --git a/src/dorkbox/network/connection/KryoExtra.java b/src/dorkbox/network/connection/KryoExtra.java index 79d54d36..1c0b98be 100644 --- a/src/dorkbox/network/connection/KryoExtra.java +++ b/src/dorkbox/network/connection/KryoExtra.java @@ -123,6 +123,186 @@ class KryoExtra extends Kryo { return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer } + public synchronized + void writeCompressed(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.connection = connection; + + ByteBuf objectOutputBuffer = this.tempBuffer; + objectOutputBuffer.clear(); // always have to reset everything + + // write the object to a TEMP buffer! this will be compressed + writer.setBuffer(objectOutputBuffer); + + writeClassAndObject(writer, message); + + // save off how much data the object took + magic byte + int length = objectOutputBuffer.writerIndex(); + + // NOTE: compression and encryption MUST work with byte[] because they use JNI! + // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use + // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... + // see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf + + byte[] inputArray; + int inputOffset; + + // Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because + // the buffer might be a slice of other buffer or a pooled buffer: + //noinspection Duplicates + if (objectOutputBuffer.hasArray() && + objectOutputBuffer.array()[0] == objectOutputBuffer.getByte(0) && + objectOutputBuffer.array().length == objectOutputBuffer.capacity()) { + + // we can use it... + inputArray = objectOutputBuffer.array(); + inputArrayLength = -1; // this is so we don't REUSE this array accidentally! + inputOffset = objectOutputBuffer.arrayOffset(); + } + else { + // we can NOT use it. + if (length > inputArrayLength) { + inputArrayLength = length; + inputArray = new byte[length]; + this.inputArray = inputArray; + } + else { + inputArray = this.inputArray; + } + + objectOutputBuffer.getBytes(objectOutputBuffer.readerIndex(), inputArray, 0, length); + inputOffset = 0; + } + + ////////// compressing data + // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger + // output), will be negated by the increase in size by the encryption + + byte[] compressOutput = this.compressOutput; + + int maxLengthLengthOffset = 5; + int maxCompressedLength = compressor.maxCompressedLength(length); + + // add 5 so there is room to write the compressed size to the buffer + int maxCompressedLengthWithOffset = maxCompressedLength + maxLengthLengthOffset; + + // lazy initialize the compression output buffer + if (maxCompressedLengthWithOffset > compressOutputLength) { + compressOutputLength = maxCompressedLengthWithOffset; + compressOutput = new byte[maxCompressedLengthWithOffset]; + this.compressOutput = compressOutput; + } + + + // LZ4 compress. output offset max 5 bytes to leave room for length of tempOutput data + int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, maxLengthLengthOffset, maxCompressedLength); + + // bytes can now be written to, because our compressed data is stored in a temp array. + + final int lengthLength = OptimizeUtilsByteArray.intLength(length, true); + + // correct input. compression output is now buffer input + inputArray = compressOutput; + inputOffset = maxLengthLengthOffset - lengthLength; + + + // now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST decompress version + OptimizeUtilsByteArray.writeInt(inputArray, length, true, inputOffset); + + // write out the "magic" byte. + buffer.writeByte(crypto); + + // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size + buffer.writeBytes(inputArray, inputOffset, compressedLength + lengthLength); + } + + public + Object readCompressed(final ConnectionImpl connection, final ByteBuf buffer, int length) throws IOException { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.connection = connection; + + + //////////////// + // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! + //////////////// + + ByteBuf inputBuf = buffer; + + // read off the magic byte + final byte magicByte = buffer.readByte(); + + // get the decompressed length (at the beginning of the array) + final int uncompressedLength = OptimizeUtilsByteBuf.readInt(buffer, true); + final int lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-5 bytes for the decompressed size + + // have to adjust for the magic byte and uncompressed length + length = length - 1 - lengthLength; + + + ///////// decompress data -- as it's ALWAYS compressed + + // NOTE: compression and encryption MUST work with byte[] because they use JNI! + // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use + // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... + // see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf + + byte[] inputArray; + int inputOffset; + + // Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because + // the buffer might be a slice of other buffer or a pooled buffer: + //noinspection Duplicates + if (inputBuf.hasArray() && + inputBuf.array()[0] == inputBuf.getByte(0) && + inputBuf.array().length == inputBuf.capacity()) { + + // we can use it... + inputArray = inputBuf.array(); + inputArrayLength = -1; // this is so we don't REUSE this array accidentally! + inputOffset = inputBuf.arrayOffset(); + } + else { + // we can NOT use it. + if (length > inputArrayLength) { + inputArrayLength = length; + inputArray = new byte[length]; + this.inputArray = inputArray; + } + else { + inputArray = this.inputArray; + } + + inputBuf.getBytes(inputBuf.readerIndex(), inputArray, 0, length); + inputOffset = 0; + } + + // have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index. + buffer.readerIndex(buffer.readerIndex() + length); + + + ///////// decompress data -- as it's ALWAYS compressed + + byte[] decompressOutputArray = this.decompressOutput; + if (uncompressedLength > decompressOutputLength) { + decompressOutputLength = uncompressedLength; + decompressOutputArray = new byte[uncompressedLength]; + this.decompressOutput = decompressOutputArray; + + decompressBuf = Unpooled.wrappedBuffer(decompressOutputArray); // so we can read via kryo + } + inputBuf = decompressBuf; + + // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor + decompressor.decompress(inputArray, inputOffset, decompressOutputArray, 0, uncompressedLength); + + inputBuf.setIndex(0, uncompressedLength); + + + // read the object from the buffer. + reader.setBuffer(inputBuf); + + return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer + } public synchronized void writeCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException { @@ -137,7 +317,7 @@ class KryoExtra extends Kryo { writeClassAndObject(writer, message); - // save off how much data the object took + magic byte + // save off how much data the object took int length = objectOutputBuffer.writerIndex(); @@ -258,8 +438,6 @@ class KryoExtra extends Kryo { // write out our GCM counter OptimizeUtilsByteBuf.writeLong(buffer, nextGcmSequence, true); -// System.err.println("out " + gcmIVCounter); - // have to copy over the orig data, because we used the temp buffer buffer.writeBytes(cryptoOutput, 0, encryptedLength); } @@ -280,7 +458,6 @@ class KryoExtra extends Kryo { final byte magicByte = buffer.readByte(); final long gcmIVCounter = OptimizeUtilsByteBuf.readLong(buffer, true); -// System.err.println("in " + gcmIVCounter); // compression can ONLY happen if it's ALSO crypto'd