diff --git a/src/dorkbox/network/connection/KryoExtra.java b/src/dorkbox/network/connection/KryoExtra.java index 7635ea45..535a0bca 100644 --- a/src/dorkbox/network/connection/KryoExtra.java +++ b/src/dorkbox/network/connection/KryoExtra.java @@ -26,15 +26,15 @@ import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4FastDecompressor; import org.bouncycastle.crypto.engines.AESFastEngine; import org.bouncycastle.crypto.modes.GCMBlockCipher; +import org.slf4j.Logger; import java.io.IOException; +/** + * Nothing in this class is thread safe + */ public class KryoExtra extends Kryo { - - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KryoExtra.class); - private static final ByteBuf nullByteBuf = (ByteBuf) null; - // this is the minimum size that LZ4 can compress. Anything smaller than this will result in an output LARGER than the input. private static final int LZ4_COMPRESSION_MIN_SIZE = 32; @@ -44,16 +44,21 @@ class KryoExtra extends Kryo { private static final byte compress = (byte) 1; static final byte crypto = (byte) (1 << 1); - private static LZ4Factory factory = LZ4Factory.fastestInstance(); + // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) + // snappyuncomp : 1.391 micros/op; 2808.1 MB/s + // lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%) + // lz4uncomp : 0.641 micros/op; 6097.9 MB/s + private static final LZ4Factory factory = LZ4Factory.fastestInstance(); // for kryo serialization private final ByteBufInput reader = new ByteBufInput(); private final ByteBufOutput writer = new ByteBufOutput(); - // not thread safe - public ConnectionImpl connection; + // volatile to provide object visibility for entire class + public volatile ConnectionImpl connection; + + private final GCMBlockCipher aesEngine = new GCMBlockCipher(new AESFastEngine()); - private final GCMBlockCipher aesEngine; // writing data private final ByteBuf tempBuffer = Unpooled.buffer(EndPoint.udpMaxSize); @@ -66,8 +71,6 @@ class KryoExtra extends Kryo { private byte[] cryptoOutput; - - // reading data private LZ4FastDecompressor decompressor = factory.fastDecompressor(); @@ -79,88 +82,33 @@ class KryoExtra extends Kryo { private byte[] decompressOutput; private ByteBuf decompressBuf; -// private static XXHashFactory hashFactory = XXHashFactory.fastestInstance(); public KryoExtra() { -// final Checksum checksumIn; -// final Checksum checksumOut; -// if (OS.is64bit()) { -// long seed = 0x3f79759cb27bf824L; // this has to stay the same -// checksumIn = hashFactory.newStreamingHash64(seed) -// .asChecksum(); -// checksumOut = hashFactory.newStreamingHash64(seed) -// .asChecksum(); -// -// } -// else { -// int seed = 0xF51BA30E; // this has to stay the same -// checksumIn = hashFactory.newStreamingHash32(seed) -// .asChecksum(); -// checksumOut = hashFactory.newStreamingHash32(seed) -// .asChecksum(); -// } -// - - -// > Our data is partitionaned into 64 blocks which are compressed separetely. -// -// 64 bytes blocks ? This would be much too small. -// -// -// > Would a larger block size improve the compression ratio? -// -// Probably. -// When compressing data using independent blocks, you can observe compression ratio gains up to 1 MB blocks. -// Beyond that, benefits become less ans less visible. -// -// -// > Is there an optimal block size? -// -// For independent blocks, 64KB is considered optimal "small" size. -// Between 4 KB & 64 KB, it is "very small", but still manageable. -// Anything below 4 KB is starting to miss a lost of compression opportunity. Ratio will plummet. -// -// -// > Does anybody have recommendations on how to improve the compression ration? -// -// Anytime you input data consists of -// tables with fixed length cells, -// blosc should be tested : if offers big opportunities for compression savings. - - -// ➜ leveldb git:(lz4) ✗ ./db_bench -// LevelDB: version 1.7 -// Keys: 16 bytes each -// Values: 100 bytes each (50 bytes after compression) -// Entries: 1000000 -// RawSize: 110.6 MB (estimated) -// FileSize: 62.9 MB (estimated) -// ------------------------------------------------ -// snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) -// snappyuncomp : 1.391 micros/op; 2808.1 MB/s -// lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%) -// lz4uncomp : 0.641 micros/op; 6097.9 MB/s - - this.aesEngine = new GCMBlockCipher(new AESFastEngine()); } - public - void write(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) throws IOException { + /** + * @param connection if != null, perform crypto on the data. + */ + public synchronized + void write(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final Logger logger) throws IOException { + // connection will ALWAYS be of type Connection or NULL. + // used by RMI/some serializers to determine which connection wrote this object + // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI + this.connection = connection; + + // do we compress + crypto the data? (don't always need it, specifically during connection INIT) - if (connection == null || !doCrypto) { + if (connection == null) { // magic byte buffer.writeByte(0); // write the object to the NORMAL output buffer! writer.setBuffer(buffer); - // connection will ALWAYS be of type Connection or NULL. - // used by RMI/some serializers to determine which connection wrote this object - // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI - this.connection = connection; writeClassAndObject(writer, message); } else { + final boolean traceEnabled = logger.isTraceEnabled(); byte magicByte = (byte) 0x00000000; ByteBuf objectOutputBuffer = this.tempBuffer; @@ -169,11 +117,6 @@ class KryoExtra extends Kryo { // write the object to a TEMP buffer! this will be compressed writer.setBuffer(objectOutputBuffer); - // connection will ALWAYS be of type Connection or NULL. - // used by RMI/some serializers to determine which connection wrote this object - // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI - this.connection = connection; - writeClassAndObject(writer, message); // save off how much data the object took + magic byte @@ -212,7 +155,7 @@ class KryoExtra extends Kryo { } if (length > LZ4_COMPRESSION_MIN_SIZE) { - if (logger.isTraceEnabled()) { + if (traceEnabled) { logger.trace("Compressing data {}", connection); } @@ -237,7 +180,8 @@ class KryoExtra extends Kryo { // bytes can now be written to, because our compressed data is stored in a temp array. // ONLY do this if our compressed length is LESS than our uncompressed length. if (compressedLength < length) { - // now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST version + // now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST + // decompress version BigEndian.Int_.toBytes(length, compressOutput); magicByte |= compress; @@ -251,8 +195,7 @@ class KryoExtra extends Kryo { } } - - if (logger.isTraceEnabled()) { + if (traceEnabled) { logger.trace("AES encrypting data {}", connection); } @@ -263,7 +206,7 @@ class KryoExtra extends Kryo { aes.reset(); aes.init(true, connection.getCryptoParameters()); - byte[] cryptoOutput = this.cryptoOutput; + byte[] cryptoOutput; // lazy initialize the crypto output buffer int cryptoSize = aes.getOutputSize(length); @@ -273,14 +216,15 @@ class KryoExtra extends Kryo { cryptoOutputLength = cryptoSize; cryptoOutput = new byte[cryptoSize]; this.cryptoOutput = cryptoOutput; + } else { + cryptoOutput = this.cryptoOutput; } - //System.err.println("out orig: " + length + " first/last " + inputArray[inputOffset] + " " + inputArray[length-1]); - int actualLength = aes.processBytes(inputArray, inputOffset, length, cryptoOutput, 0); + int encryptedLength = aes.processBytes(inputArray, inputOffset, length, cryptoOutput, 0); try { // authentication tag for GCM - actualLength += aes.doFinal(cryptoOutput, actualLength); + encryptedLength += aes.doFinal(cryptoOutput, encryptedLength); } catch (Exception e) { throw new IOException("Unable to AES encrypt the data", e); } @@ -291,19 +235,27 @@ class KryoExtra extends Kryo { buffer.writeByte(magicByte); // have to copy over the orig data, because we used the temp buffer - buffer.writeBytes(cryptoOutput, 0, actualLength); + buffer.writeBytes(cryptoOutput, 0, encryptedLength); } } @SuppressWarnings("Duplicates") - public - Object read(final ConnectionImpl connection, final ByteBuf buffer, int length) throws IOException { - // we cannot use the buffer as a "temp" storage, because there is other data on it + public synchronized + Object read(final ConnectionImpl connection, final ByteBuf buffer, int length, final Logger logger) throws IOException { + // connection will ALWAYS be of type IConnection or NULL. + // used by RMI/some serializers to determine which connection read this object + // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI + this.connection = connection; + + + //////////////// + // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! + //////////////// + + ByteBuf inputBuf = buffer; // read off the magic byte final byte magicByte = buffer.readByte(); - ByteBuf inputBuf = buffer; - // compression can ONLY happen if it's ALSO crypto'd if ((magicByte & crypto) == crypto) { @@ -318,7 +270,6 @@ class KryoExtra extends Kryo { logger.trace("AES decrypting data {}", connection); } - // NOTE: compression and encryption MUST work with byte[] because they use JNI! // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... @@ -349,6 +300,8 @@ class KryoExtra extends Kryo { inputOffset = 0; } + // have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index. + buffer.readerIndex(buffer.readerIndex() + length); final GCMBlockCipher aes = this.aesEngine; aes.reset(); @@ -357,19 +310,21 @@ class KryoExtra extends Kryo { int cryptoSize = aes.getOutputSize(length); // lazy initialize the decrypt output buffer - byte[] decryptOutputArray = this.decryptOutput; + byte[] decryptOutputArray; if (cryptoSize > decryptOutputLength) { decryptOutputLength = cryptoSize; decryptOutputArray = new byte[cryptoSize]; this.decryptOutput = decryptOutputArray; decryptBuf = Unpooled.wrappedBuffer(decryptOutputArray); + } else { + decryptOutputArray = this.decryptOutput; } int decryptedLength = aes.processBytes(inputArray, inputOffset, length, decryptOutputArray, 0); try { - // authentication tag for GCM (since we are DECRYPTING, the original 'actualLength' is the correct one.) + // authentication tag for GCM decryptedLength += aes.doFinal(decryptOutputArray, decryptedLength); } catch (Exception e) { throw new IOException("Unable to AES decrypt the data", e); @@ -403,30 +358,11 @@ class KryoExtra extends Kryo { } } - // read the object from the buffer. reader.setBuffer(inputBuf); - // connection will ALWAYS be of type IConnection or NULL. - // used by RMI/some serializers to determine which connection read this object - // NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI - this.connection = connection; + return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer - return readClassAndObject(reader); - } - - public final - void releaseWrite() { - // release/reset connection resources - writer.setBuffer(nullByteBuf); - connection = null; - } - - public final - void releaseRead() { - // release/reset connection resources - reader.setBuffer(nullByteBuf); - connection = null; } @Override