Commented out kryo compression and encryption buffers + logic

This commit is contained in:
nathan 2020-09-03 22:21:27 +02:00
parent f2bd71d8c4
commit ca32ef36e1

View File

@ -19,13 +19,7 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.io.Output
import dorkbox.network.connection.Connection import dorkbox.network.connection.Connection
import dorkbox.os.OS
import dorkbox.util.Sys
import dorkbox.util.bytes.OptimizeUtilsByteArray
import net.jpountz.lz4.LZ4Factory
import org.agrona.DirectBuffer import org.agrona.DirectBuffer
import org.slf4j.Logger
import java.io.IOException
/** /**
* Nothing in this class is thread safe * Nothing in this class is thread safe
@ -36,34 +30,32 @@ class KryoExtra() : Kryo() {
val writerBuffer = AeronOutput() val writerBuffer = AeronOutput()
// crypto + compression have to work with native byte arrays, so here we go... // crypto + compression have to work with native byte arrays, so here we go...
private val reader = Input(ABSOLUTE_MAX_SIZE_OBJECT) // private val reader = Input(ABSOLUTE_MAX_SIZE_OBJECT)
private val writer = Output(ABSOLUTE_MAX_SIZE_OBJECT) // private val writer = Output(ABSOLUTE_MAX_SIZE_OBJECT)
private val temp = ByteArray(ABSOLUTE_MAX_SIZE_OBJECT) // private val temp = ByteArray(ABSOLUTE_MAX_SIZE_OBJECT)
// This is unique per connection. volatile/etc is not necessary because it is set/read in the same thread // This is unique per connection. volatile/etc is not necessary because it is set/read in the same thread
lateinit var connection: Connection lateinit var connection: Connection
// private val secureRandom = SecureRandom() // private val secureRandom = SecureRandom()
// private var cipher: Cipher? = null // private var cipher: Cipher? = null
private val compressor = factory.fastCompressor() // private val compressor = factory.fastCompressor()
private val decompressor = factory.fastDecompressor() // private val decompressor = factory.fastDecompressor()
//
companion object { // companion object {
private const val ABSOLUTE_MAX_SIZE_OBJECT = 500000 // by default, this is about 500k // private const val ABSOLUTE_MAX_SIZE_OBJECT = 500000 // by default, this is about 500k
private const val DEBUG = false // private const val DEBUG = false
//
// snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) // // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
// snappyuncomp : 1.391 micros/op; 2808.1 MB/s // // snappyuncomp : 1.391 micros/op; 2808.1 MB/s
// lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%) // // lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%)
// lz4uncomp : 0.641 micros/op; 6097.9 MB/s // // lz4uncomp : 0.641 micros/op; 6097.9 MB/s
private val factory = LZ4Factory.fastestInstance() // private val factory = LZ4Factory.fastestInstance()
private const val ALGORITHM = "AES/GCM/NoPadding" // private const val ALGORITHM = "AES/GCM/NoPadding"
private const val TAG_LENGTH_BIT = 128 // private const val TAG_LENGTH_BIT = 128
private const val IV_LENGTH_BYTE = 12 // private const val IV_LENGTH_BYTE = 12
} // }
// init { // init {
// cipher = try { // cipher = try {
@ -152,7 +144,7 @@ class KryoExtra() : Kryo() {
* + class and object bytes + * + class and object bytes +
* ++++++++++++++++++++++++++ * ++++++++++++++++++++++++++
*/ */
private fun write(writer: Output, message: Any) { fun write(writer: Output, message: Any) {
// write the object to the NORMAL output buffer! // write the object to the NORMAL output buffer!
writer.reset() writer.reset()
writeClassAndObject(writer, message) writeClassAndObject(writer, message)
@ -197,218 +189,218 @@ class KryoExtra() : Kryo() {
return readClassAndObject(reader) return readClassAndObject(reader)
} }
//
/** // /**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! // * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
* // *
* BUFFER: // * BUFFER:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data + // * + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* // *
* COMPRESSED DATA: // * COMPRESSED DATA:
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
* + class and object bytes + // * + class and object bytes +
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
*/ // */
fun writeCompressed(logger: Logger, output: Output, message: Any) { // fun writeCompressed(logger: Logger, output: Output, message: Any) {
// write the object to a TEMP buffer! this will be compressed later // // write the object to a TEMP buffer! this will be compressed later
write(writer, message) // write(writer, message)
//
// save off how much data the object took // // save off how much data the object took
val length = writer.position() // val length = writer.position()
val maxCompressedLength = compressor.maxCompressedLength(length) // val maxCompressedLength = compressor.maxCompressedLength(length)
//
////////// compressing data // ////////// compressing data
// we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger // // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// output), will be negated by the increase in size by the encryption // // output), will be negated by the increase in size by the encryption
val compressOutput = temp // val compressOutput = temp
//
// LZ4 compress. // // LZ4 compress.
val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) // val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
//
if (DEBUG) { // if (DEBUG) {
val orig = Sys.bytesToHex(writer.buffer, 0, length) // val orig = Sys.bytesToHex(writer.buffer, 0, length)
val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength) // val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength)
logger.error(OS.LINE_SEPARATOR + // logger.error(OS.LINE_SEPARATOR +
"ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + // "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
OS.LINE_SEPARATOR + // OS.LINE_SEPARATOR +
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) // "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
} // }
//
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version // // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
output.writeInt(length, true) // output.writeInt(length, true)
//
// have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size // // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
output.writeBytes(compressOutput, 0, compressedLength) // output.writeBytes(compressOutput, 0, compressedLength)
} // }
//
/** // /**
* BUFFER: // * BUFFER:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data + // * + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* // *
* COMPRESSED DATA: // * COMPRESSED DATA:
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
* + class and object bytes + // * + class and object bytes +
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
*/ // */
fun writeCompressed(logger: Logger, connection: Connection, output: Output, message: Any) { // fun writeCompressed(logger: Logger, connection: Connection, output: Output, message: Any) {
// write the object to a TEMP buffer! this will be compressed later // // write the object to a TEMP buffer! this will be compressed later
write(connection, writer, message) // write(connection, writer, message)
//
// save off how much data the object took // // save off how much data the object took
val length = writer.position() // val length = writer.position()
val maxCompressedLength = compressor.maxCompressedLength(length) // val maxCompressedLength = compressor.maxCompressedLength(length)
//
////////// compressing data // ////////// compressing data
// we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger // // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
// output), will be negated by the increase in size by the encryption // // output), will be negated by the increase in size by the encryption
val compressOutput = temp // val compressOutput = temp
//
// LZ4 compress. // // LZ4 compress.
val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength) // val compressedLength = compressor.compress(writer.buffer, 0, length, compressOutput, 0, maxCompressedLength)
//
if (DEBUG) { // if (DEBUG) {
val orig = Sys.bytesToHex(writer.buffer, 0, length) // val orig = Sys.bytesToHex(writer.buffer, 0, length)
val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength) // val compressed = Sys.bytesToHex(compressOutput, 0, compressedLength)
logger.error(OS.LINE_SEPARATOR + // logger.error(OS.LINE_SEPARATOR +
"ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + // "ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
OS.LINE_SEPARATOR + // OS.LINE_SEPARATOR +
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed) // "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed)
} // }
//
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version // // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
output.writeInt(length, true) // output.writeInt(length, true)
//
// have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size // // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size
output.writeBytes(compressOutput, 0, compressedLength) // output.writeBytes(compressOutput, 0, compressedLength)
} // }
//
/** // /**
* NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI! // * NOTE: THIS CANNOT BE USED FOR ANYTHING RELATED TO RMI!
* // *
* BUFFER: // * BUFFER:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data + // * + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* // *
* COMPRESSED DATA: // * COMPRESSED DATA:
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
* + class and object bytes + // * + class and object bytes +
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
*/ // */
fun readCompressed(logger: Logger, input: Input, length: Int): Any { // fun readCompressed(logger: Logger, input: Input, length: Int): Any {
//////////////// // ////////////////
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! // // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
//////////////// // ////////////////
//
// get the decompressed length (at the beginning of the array) // // get the decompressed length (at the beginning of the array)
var length = length // var length = length
val uncompressedLength = input.readInt(true) // val uncompressedLength = input.readInt(true)
if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) { // if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) {
throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!") // throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!")
} // }
//
// because 1-4 bytes for the decompressed size (this number is never negative) // // because 1-4 bytes for the decompressed size (this number is never negative)
val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true) // val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true)
val start = input.position() // val start = input.position()
//
// have to adjust for uncompressed length-length // // have to adjust for uncompressed length-length
length = length - lengthLength // length = length - lengthLength
//
//
///////// decompress data // ///////// decompress data
input.readBytes(temp, 0, length) // input.readBytes(temp, 0, length)
//
//
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor) // // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor)
reader.reset() // reader.reset()
decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength) // decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength)
reader.setLimit(uncompressedLength) // reader.setLimit(uncompressedLength)
//
if (DEBUG) { // if (DEBUG) {
val compressed = Sys.bytesToHex(temp, start, length) // val compressed = Sys.bytesToHex(temp, start, length)
val orig = Sys.bytesToHex(reader.buffer, start, uncompressedLength) // val orig = Sys.bytesToHex(reader.buffer, start, uncompressedLength)
logger.error(OS.LINE_SEPARATOR + // logger.error(OS.LINE_SEPARATOR +
"COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed + // "COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed +
OS.LINE_SEPARATOR + // OS.LINE_SEPARATOR +
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig) // "ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig)
} // }
//
// read the object from the buffer. // // read the object from the buffer.
return read(reader) // return read(reader)
} // }
//
/** // /**
* BUFFER: // * BUFFER:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data + // * + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* // *
* COMPRESSED DATA: // * COMPRESSED DATA:
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
* + class and object bytes + // * + class and object bytes +
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
*/ // */
fun readCompressed(logger: Logger, connection: Connection, input: Input, length: Int): Any { // fun readCompressed(logger: Logger, connection: Connection, input: Input, length: Int): Any {
//////////////// // ////////////////
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! // // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
//////////////// // ////////////////
//
// get the decompressed length (at the beginning of the array) // // get the decompressed length (at the beginning of the array)
var length = length // var length = length
val uncompressedLength = input.readInt(true) // val uncompressedLength = input.readInt(true)
if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) { // if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) {
throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!") // throw IOException("Uncompressed size ($uncompressedLength) is larger than max allowed size ($ABSOLUTE_MAX_SIZE_OBJECT)!")
} // }
//
// because 1-4 bytes for the decompressed size (this number is never negative) // // because 1-4 bytes for the decompressed size (this number is never negative)
val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true) // val lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true)
val start = input.position() // val start = input.position()
//
// have to adjust for uncompressed length-length // // have to adjust for uncompressed length-length
length = length - lengthLength // length = length - lengthLength
//
//
///////// decompress data // ///////// decompress data
input.readBytes(temp, 0, length) // input.readBytes(temp, 0, length)
//
//
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor) // // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor)
reader.reset() // reader.reset()
decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength) // decompressor.decompress(temp, 0, reader.buffer, 0, uncompressedLength)
reader.setLimit(uncompressedLength) // reader.setLimit(uncompressedLength)
if (DEBUG) { // if (DEBUG) {
val compressed = Sys.bytesToHex(input.readAllBytes(), start, length) // val compressed = Sys.bytesToHex(input.readAllBytes(), start, length)
val orig = Sys.bytesToHex(reader.buffer, start, uncompressedLength) // val orig = Sys.bytesToHex(reader.buffer, start, uncompressedLength)
logger.error(OS.LINE_SEPARATOR + // logger.error(OS.LINE_SEPARATOR +
"COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed + // "COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed +
OS.LINE_SEPARATOR + // OS.LINE_SEPARATOR +
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig) // "ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig)
} // }
//
// read the object from the buffer. // // read the object from the buffer.
return read(connection, reader) // return read(connection, reader)
} // }
//
/** // /**
* BUFFER: // * BUFFER:
* +++++++++++++++++++++++++++++++ // * +++++++++++++++++++++++++++++++
* + IV (12) + encrypted data + // * + IV (12) + encrypted data +
* +++++++++++++++++++++++++++++++ // * +++++++++++++++++++++++++++++++
* // *
* ENCRYPTED DATA: // * ENCRYPTED DATA:
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* + uncompressed length (1-4 bytes) + compressed data + // * + uncompressed length (1-4 bytes) + compressed data +
* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* // *
* COMPRESSED DATA: // * COMPRESSED DATA:
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
* + class and object bytes + // * + class and object bytes +
* ++++++++++++++++++++++++++ // * ++++++++++++++++++++++++++
*/ // */
// fun writeCrypto(logger: Logger, connection: Connection_, buffer: ByteBuf, message: Any) { // fun writeCrypto(logger: Logger, connection: Connection_, buffer: ByteBuf, message: Any) {
// // write the object to a TEMP buffer! this will be compressed later // // write the object to a TEMP buffer! this will be compressed later
// write(connection, writer, message) // write(connection, writer, message)