Changed netty -> agrona for compression

This commit is contained in:
nathan 2020-08-13 17:03:06 +02:00
parent bb6cacbaee
commit 8857d1d016

View File

@ -22,10 +22,8 @@ import dorkbox.network.pipeline.AeronInput
import dorkbox.network.pipeline.AeronOutput import dorkbox.network.pipeline.AeronOutput
import dorkbox.network.rmi.CachedMethod import dorkbox.network.rmi.CachedMethod
import dorkbox.os.OS import dorkbox.os.OS
import dorkbox.util.Sys
import dorkbox.util.bytes.OptimizeUtilsByteArray import dorkbox.util.bytes.OptimizeUtilsByteArray
import dorkbox.util.bytes.OptimizeUtilsByteBuf
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufUtil
import net.jpountz.lz4.LZ4Factory import net.jpountz.lz4.LZ4Factory
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
import org.agrona.collections.Int2ObjectHashMap import org.agrona.collections.Int2ObjectHashMap
@ -221,7 +219,7 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
* + class and object bytes + * + class and object bytes +
* ++++++++++++++++++++++++++ * ++++++++++++++++++++++++++
*/ */
fun writeCompressed(logger: Logger, buffer: ByteBuf, message: Any) { fun writeCompressed(logger: Logger, output: Output, message: Any) {
// write the object to a TEMP buffer! this will be compressed later // write the object to a TEMP buffer! this will be compressed later
write(writer, message) write(writer, message)
@ -236,9 +234,10 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
// LZ4 compress. // LZ4 compress.
val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
if (DEBUG) { if (DEBUG) {
val orig = ByteBufUtil.hexDump(writer.buffer, 0, length) val orig = Sys.bytesToHex(writer.buffer, 0, length)
val compressed = ByteBufUtil.hexDump(compressOutput, 0, compressedLength) val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength)
logger.error(OS.LINE_SEPARATOR + logger.error(OS.LINE_SEPARATOR +
"ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
OS.LINE_SEPARATOR + OS.LINE_SEPARATOR +
@ -246,10 +245,10 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
} }
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
OptimizeUtilsByteBuf.writeInt(buffer, length, true) output.writeInt(length, true)
// have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
buffer.writeBytes(compressOutput, 0, compressedLength) output.writeBytes(compressOutput, 0, compressedLength)
} }
/** /**
@ -263,7 +262,7 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
* + class and object bytes + * + class and object bytes +
* ++++++++++++++++++++++++++ * ++++++++++++++++++++++++++
*/ */
fun writeCompressed(logger: Logger, connection: Connection, buffer: ByteBuf, message: Any) { fun writeCompressed(logger: Logger, connection: Connection, output: Output, message: Any) {
// write the object to a TEMP buffer! this will be compressed later // write the object to a TEMP buffer! this will be compressed later
write(connection, writer, message) write(connection, writer, message)
@ -278,9 +277,10 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
// LZ4 compress. // LZ4 compress.
val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
if (DEBUG) { if (DEBUG) {
val orig = ByteBufUtil.hexDump(writer.buffer, 0, length) val orig = Sys.bytesToHex(writer.buffer, 0, length)
val compressed = ByteBufUtil.hexDump(compressOutput, 0, compressedLength) val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength)
logger.error(OS.LINE_SEPARATOR + logger.error(OS.LINE_SEPARATOR +
"ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
OS.LINE_SEPARATOR + OS.LINE_SEPARATOR +
@ -288,10 +288,10 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
} }
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
OptimizeUtilsByteBuf.writeInt(buffer, length, true) output.writeInt(length, true)
// have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
buffer.writeBytes(compressOutput, 0, compressedLength) output.writeBytes(compressOutput, 0, compressedLength)
} }
/** /**
@ -307,37 +307,38 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
* + class and object bytes + * + class and object bytes +
* ++++++++++++++++++++++++++ * ++++++++++++++++++++++++++
*/ */
fun readCompressed(logger: Logger, buffer: ByteBuf, length: Int): Any { fun readCompressed(logger: Logger, input: Input, length: Int): Any {
//////////////// ////////////////
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
//////////////// ////////////////
// get the decompressed length (at the beginning of the array) // get the decompressed length (at the beginning of the array)
var length = length var length = length
val uncompressedLength = OptimizeUtilsByteBuf.readInt(buffer, true) val uncompressedLength = input.readInt(true)
if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) { if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) {
throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!") throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!")
} }
// because 1-4 bytes for the decompressed size (this number is never negative) // because 1-4 bytes for the decompressed size (this number is never negative)
val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true) val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true)
val start = buffer.readerIndex() val start = input.position()
// have to adjust for uncompressed length-length // have to adjust for uncompressed length-length
length = length - lengthLength length = length - lengthLength
///////// decompress data ///////// decompress data
buffer.readBytes(temp, 0, length) input.readBytes(temp, 0, length)
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor) // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor)
reader.reset() reader.reset()
decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength) decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength)
reader.setLimit(uncompressedLength) reader.setLimit(uncompressedLength)
if (DEBUG) { if (DEBUG) {
val compressed = ByteBufUtil.hexDump(buffer, start, length) val compressed = Sys.bytesToHex(temp, start, length)
val orig = ByteBufUtil.hexDump(reader.buffer, start, uncompressedLength) val orig = Sys.bytesToHex(reader.buffer, start, uncompressedLength)
logger.error(OS.LINE_SEPARATOR + logger.error(OS.LINE_SEPARATOR +
"COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed + "COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed +
OS.LINE_SEPARATOR + OS.LINE_SEPARATOR +
@ -359,28 +360,28 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
* + class and object bytes + * + class and object bytes +
* ++++++++++++++++++++++++++ * ++++++++++++++++++++++++++
*/ */
fun readCompressed(logger: Logger, connection: Connection, buffer: ByteBuf, length: Int): Any { fun readCompressed(logger: Logger, connection: Connection, input: Input, length: Int): Any {
//////////////// ////////////////
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
//////////////// ////////////////
// get the decompressed length (at the beginning of the array) // get the decompressed length (at the beginning of the array)
var length = length var length = length
val uncompressedLength = OptimizeUtilsByteBuf.readInt(buffer, true) val uncompressedLength = input.readInt(true)
if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) { if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) {
throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!") throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!")
} }
// because 1-4 bytes for the decompressed size (this number is never negative) // because 1-4 bytes for the decompressed size (this number is never negative)
val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true) val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true)
val start = buffer.readerIndex() val start = input.position()
// have to adjust for uncompressed length-length // have to adjust for uncompressed length-length
length = length - lengthLength length = length - lengthLength
///////// decompress data ///////// decompress data
buffer.readBytes(temp, 0, length) input.readBytes(temp, 0, length)
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor) // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor)
@ -388,8 +389,8 @@ class KryoExtra(private val methodCache: Int2ObjectHashMap<Array<CachedMethod>>)
decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength) decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength)
reader.setLimit(uncompressedLength) reader.setLimit(uncompressedLength)
if (DEBUG) { if (DEBUG) {
val compressed = ByteBufUtil.hexDump(buffer, start, length) val compressed = Sys.bytesToHex(input.readAllBytes(), start, length)
val orig = ByteBufUtil.hexDump(reader.buffer, start, uncompressedLength) val orig = Sys.bytesToHex(reader.buffer, start, uncompressedLength)
logger.error(OS.LINE_SEPARATOR + logger.error(OS.LINE_SEPARATOR +
"COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed + "COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed +
OS.LINE_SEPARATOR + OS.LINE_SEPARATOR +