Fixed issues with using the backing arrays. code polish

This commit is contained in:
nathan 2016-03-06 23:56:17 +01:00
parent e2f3f3aca7
commit e1823c0f3b

View File

@ -26,15 +26,15 @@ import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor; import net.jpountz.lz4.LZ4FastDecompressor;
import org.bouncycastle.crypto.engines.AESFastEngine; import org.bouncycastle.crypto.engines.AESFastEngine;
import org.bouncycastle.crypto.modes.GCMBlockCipher; import org.bouncycastle.crypto.modes.GCMBlockCipher;
import org.slf4j.Logger;
import java.io.IOException; import java.io.IOException;
/**
* Nothing in this class is thread safe
*/
public public
class KryoExtra extends Kryo { class KryoExtra extends Kryo {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KryoExtra.class);
private static final ByteBuf nullByteBuf = (ByteBuf) null;
// this is the minimum size that LZ4 can compress. Anything smaller than this will result in an output LARGER than the input. // this is the minimum size that LZ4 can compress. Anything smaller than this will result in an output LARGER than the input.
private static final int LZ4_COMPRESSION_MIN_SIZE = 32; private static final int LZ4_COMPRESSION_MIN_SIZE = 32;
@ -44,16 +44,21 @@ class KryoExtra extends Kryo {
private static final byte compress = (byte) 1; private static final byte compress = (byte) 1;
static final byte crypto = (byte) (1 << 1); static final byte crypto = (byte) (1 << 1);
private static LZ4Factory factory = LZ4Factory.fastestInstance(); // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
// snappyuncomp : 1.391 micros/op; 2808.1 MB/s
// lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%)
// lz4uncomp : 0.641 micros/op; 6097.9 MB/s
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
// for kryo serialization // for kryo serialization
private final ByteBufInput reader = new ByteBufInput(); private final ByteBufInput reader = new ByteBufInput();
private final ByteBufOutput writer = new ByteBufOutput(); private final ByteBufOutput writer = new ByteBufOutput();
// not thread safe // volatile to provide object visibility for entire class
public ConnectionImpl connection; public volatile ConnectionImpl connection;
private final GCMBlockCipher aesEngine = new GCMBlockCipher(new AESFastEngine());
private final GCMBlockCipher aesEngine;
// writing data // writing data
private final ByteBuf tempBuffer = Unpooled.buffer(EndPoint.udpMaxSize); private final ByteBuf tempBuffer = Unpooled.buffer(EndPoint.udpMaxSize);
@ -66,8 +71,6 @@ class KryoExtra extends Kryo {
private byte[] cryptoOutput; private byte[] cryptoOutput;
// reading data // reading data
private LZ4FastDecompressor decompressor = factory.fastDecompressor(); private LZ4FastDecompressor decompressor = factory.fastDecompressor();
@ -79,88 +82,33 @@ class KryoExtra extends Kryo {
private byte[] decompressOutput; private byte[] decompressOutput;
private ByteBuf decompressBuf; private ByteBuf decompressBuf;
// private static XXHashFactory hashFactory = XXHashFactory.fastestInstance();
public public
KryoExtra() { KryoExtra() {
// final Checksum checksumIn;
// final Checksum checksumOut;
// if (OS.is64bit()) {
// long seed = 0x3f79759cb27bf824L; // this has to stay the same
// checksumIn = hashFactory.newStreamingHash64(seed)
// .asChecksum();
// checksumOut = hashFactory.newStreamingHash64(seed)
// .asChecksum();
//
// }
// else {
// int seed = 0xF51BA30E; // this has to stay the same
// checksumIn = hashFactory.newStreamingHash32(seed)
// .asChecksum();
// checksumOut = hashFactory.newStreamingHash32(seed)
// .asChecksum();
// }
//
// > Our data is partitionaned into 64 blocks which are compressed separetely.
//
// 64 bytes blocks ? This would be much too small.
//
//
// > Would a larger block size improve the compression ratio?
//
// Probably.
// When compressing data using independent blocks, you can observe compression ratio gains up to 1 MB blocks.
// Beyond that, benefits become less ans less visible.
//
//
// > Is there an optimal block size?
//
// For independent blocks, 64KB is considered optimal "small" size.
// Between 4 KB & 64 KB, it is "very small", but still manageable.
// Anything below 4 KB is starting to miss a lost of compression opportunity. Ratio will plummet.
//
//
// > Does anybody have recommendations on how to improve the compression ration?
//
// Anytime you input data consists of
// tables with fixed length cells,
// blosc should be tested : if offers big opportunities for compression savings.
// leveldb git:(lz4) ./db_bench
// LevelDB: version 1.7
// Keys: 16 bytes each
// Values: 100 bytes each (50 bytes after compression)
// Entries: 1000000
// RawSize: 110.6 MB (estimated)
// FileSize: 62.9 MB (estimated)
// ------------------------------------------------
// snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
// snappyuncomp : 1.391 micros/op; 2808.1 MB/s
// lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%)
// lz4uncomp : 0.641 micros/op; 6097.9 MB/s
this.aesEngine = new GCMBlockCipher(new AESFastEngine());
} }
public /**
void write(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final boolean doCrypto) throws IOException { * @param connection if != null, perform crypto on the data.
*/
public synchronized
void write(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final Logger logger) throws IOException {
// connection will ALWAYS be of type Connection or NULL.
// used by RMI/some serializers to determine which connection wrote this object
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
this.connection = connection;
// do we compress + crypto the data? (don't always need it, specifically during connection INIT) // do we compress + crypto the data? (don't always need it, specifically during connection INIT)
if (connection == null || !doCrypto) { if (connection == null) {
// magic byte // magic byte
buffer.writeByte(0); buffer.writeByte(0);
// write the object to the NORMAL output buffer! // write the object to the NORMAL output buffer!
writer.setBuffer(buffer); writer.setBuffer(buffer);
// connection will ALWAYS be of type Connection or NULL.
// used by RMI/some serializers to determine which connection wrote this object
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
this.connection = connection;
writeClassAndObject(writer, message); writeClassAndObject(writer, message);
} else { } else {
final boolean traceEnabled = logger.isTraceEnabled();
byte magicByte = (byte) 0x00000000; byte magicByte = (byte) 0x00000000;
ByteBuf objectOutputBuffer = this.tempBuffer; ByteBuf objectOutputBuffer = this.tempBuffer;
@ -169,11 +117,6 @@ class KryoExtra extends Kryo {
// write the object to a TEMP buffer! this will be compressed // write the object to a TEMP buffer! this will be compressed
writer.setBuffer(objectOutputBuffer); writer.setBuffer(objectOutputBuffer);
// connection will ALWAYS be of type Connection or NULL.
// used by RMI/some serializers to determine which connection wrote this object
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
this.connection = connection;
writeClassAndObject(writer, message); writeClassAndObject(writer, message);
// save off how much data the object took + magic byte // save off how much data the object took + magic byte
@ -212,7 +155,7 @@ class KryoExtra extends Kryo {
} }
if (length > LZ4_COMPRESSION_MIN_SIZE) { if (length > LZ4_COMPRESSION_MIN_SIZE) {
if (logger.isTraceEnabled()) { if (traceEnabled) {
logger.trace("Compressing data {}", connection); logger.trace("Compressing data {}", connection);
} }
@ -237,7 +180,8 @@ class KryoExtra extends Kryo {
// bytes can now be written to, because our compressed data is stored in a temp array. // bytes can now be written to, because our compressed data is stored in a temp array.
// ONLY do this if our compressed length is LESS than our uncompressed length. // ONLY do this if our compressed length is LESS than our uncompressed length.
if (compressedLength < length) { if (compressedLength < length) {
// now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST version // now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST
// decompress version
BigEndian.Int_.toBytes(length, compressOutput); BigEndian.Int_.toBytes(length, compressOutput);
magicByte |= compress; magicByte |= compress;
@ -251,8 +195,7 @@ class KryoExtra extends Kryo {
} }
} }
if (traceEnabled) {
if (logger.isTraceEnabled()) {
logger.trace("AES encrypting data {}", connection); logger.trace("AES encrypting data {}", connection);
} }
@ -263,7 +206,7 @@ class KryoExtra extends Kryo {
aes.reset(); aes.reset();
aes.init(true, connection.getCryptoParameters()); aes.init(true, connection.getCryptoParameters());
byte[] cryptoOutput = this.cryptoOutput; byte[] cryptoOutput;
// lazy initialize the crypto output buffer // lazy initialize the crypto output buffer
int cryptoSize = aes.getOutputSize(length); int cryptoSize = aes.getOutputSize(length);
@ -273,14 +216,15 @@ class KryoExtra extends Kryo {
cryptoOutputLength = cryptoSize; cryptoOutputLength = cryptoSize;
cryptoOutput = new byte[cryptoSize]; cryptoOutput = new byte[cryptoSize];
this.cryptoOutput = cryptoOutput; this.cryptoOutput = cryptoOutput;
} else {
cryptoOutput = this.cryptoOutput;
} }
//System.err.println("out orig: " + length + " first/last " + inputArray[inputOffset] + " " + inputArray[length-1]); int encryptedLength = aes.processBytes(inputArray, inputOffset, length, cryptoOutput, 0);
int actualLength = aes.processBytes(inputArray, inputOffset, length, cryptoOutput, 0);
try { try {
// authentication tag for GCM // authentication tag for GCM
actualLength += aes.doFinal(cryptoOutput, actualLength); encryptedLength += aes.doFinal(cryptoOutput, encryptedLength);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Unable to AES encrypt the data", e); throw new IOException("Unable to AES encrypt the data", e);
} }
@ -291,19 +235,27 @@ class KryoExtra extends Kryo {
buffer.writeByte(magicByte); buffer.writeByte(magicByte);
// have to copy over the orig data, because we used the temp buffer // have to copy over the orig data, because we used the temp buffer
buffer.writeBytes(cryptoOutput, 0, actualLength); buffer.writeBytes(cryptoOutput, 0, encryptedLength);
} }
} }
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
public public synchronized
Object read(final ConnectionImpl connection, final ByteBuf buffer, int length) throws IOException { Object read(final ConnectionImpl connection, final ByteBuf buffer, int length, final Logger logger) throws IOException {
// we cannot use the buffer as a "temp" storage, because there is other data on it // connection will ALWAYS be of type IConnection or NULL.
// used by RMI/some serializers to determine which connection read this object
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
this.connection = connection;
////////////////
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
////////////////
ByteBuf inputBuf = buffer;
// read off the magic byte // read off the magic byte
final byte magicByte = buffer.readByte(); final byte magicByte = buffer.readByte();
ByteBuf inputBuf = buffer;
// compression can ONLY happen if it's ALSO crypto'd // compression can ONLY happen if it's ALSO crypto'd
if ((magicByte & crypto) == crypto) { if ((magicByte & crypto) == crypto) {
@ -318,7 +270,6 @@ class KryoExtra extends Kryo {
logger.trace("AES decrypting data {}", connection); logger.trace("AES decrypting data {}", connection);
} }
// NOTE: compression and encryption MUST work with byte[] because they use JNI! // NOTE: compression and encryption MUST work with byte[] because they use JNI!
// Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use
// sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it...
@ -349,6 +300,8 @@ class KryoExtra extends Kryo {
inputOffset = 0; inputOffset = 0;
} }
// have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index.
buffer.readerIndex(buffer.readerIndex() + length);
final GCMBlockCipher aes = this.aesEngine; final GCMBlockCipher aes = this.aesEngine;
aes.reset(); aes.reset();
@ -357,19 +310,21 @@ class KryoExtra extends Kryo {
int cryptoSize = aes.getOutputSize(length); int cryptoSize = aes.getOutputSize(length);
// lazy initialize the decrypt output buffer // lazy initialize the decrypt output buffer
byte[] decryptOutputArray = this.decryptOutput; byte[] decryptOutputArray;
if (cryptoSize > decryptOutputLength) { if (cryptoSize > decryptOutputLength) {
decryptOutputLength = cryptoSize; decryptOutputLength = cryptoSize;
decryptOutputArray = new byte[cryptoSize]; decryptOutputArray = new byte[cryptoSize];
this.decryptOutput = decryptOutputArray; this.decryptOutput = decryptOutputArray;
decryptBuf = Unpooled.wrappedBuffer(decryptOutputArray); decryptBuf = Unpooled.wrappedBuffer(decryptOutputArray);
} else {
decryptOutputArray = this.decryptOutput;
} }
int decryptedLength = aes.processBytes(inputArray, inputOffset, length, decryptOutputArray, 0); int decryptedLength = aes.processBytes(inputArray, inputOffset, length, decryptOutputArray, 0);
try { try {
// authentication tag for GCM (since we are DECRYPTING, the original 'actualLength' is the correct one.) // authentication tag for GCM
decryptedLength += aes.doFinal(decryptOutputArray, decryptedLength); decryptedLength += aes.doFinal(decryptOutputArray, decryptedLength);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Unable to AES decrypt the data", e); throw new IOException("Unable to AES decrypt the data", e);
@ -403,30 +358,11 @@ class KryoExtra extends Kryo {
} }
} }
// read the object from the buffer. // read the object from the buffer.
reader.setBuffer(inputBuf); reader.setBuffer(inputBuf);
// connection will ALWAYS be of type IConnection or NULL. return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer
// used by RMI/some serializers to determine which connection read this object
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
this.connection = connection;
return readClassAndObject(reader);
}
public final
void releaseWrite() {
// release/reset connection resources
writer.setBuffer(nullByteBuf);
connection = null;
}
public final
void releaseRead() {
// release/reset connection resources
reader.setBuffer(nullByteBuf);
connection = null;
} }
@Override @Override