Code polish. Removed optional compression (because for small data, it doesn't matter with AES padding)
This commit is contained in:
parent
81675fbd3d
commit
e1942ae03f
@ -470,23 +470,7 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
|
|||||||
void write(final ByteBuf buffer, final Object message) throws IOException {
|
void write(final ByteBuf buffer, final Object message) throws IOException {
|
||||||
final KryoExtra kryo = kryoPool.take();
|
final KryoExtra kryo = kryoPool.take();
|
||||||
try {
|
try {
|
||||||
kryo.write(null, buffer, message);
|
kryo.write(buffer, message);
|
||||||
} finally {
|
|
||||||
kryoPool.put(kryo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
|
|
||||||
* <p>
|
|
||||||
* There is a small speed penalty if there were no kryo's available to use.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public final
|
|
||||||
void writeWithCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
|
|
||||||
final KryoExtra kryo = kryoPool.take();
|
|
||||||
try {
|
|
||||||
kryo.writeCrypto(connection, buffer, message, logger);
|
|
||||||
} finally {
|
} finally {
|
||||||
kryoPool.put(kryo);
|
kryoPool.put(kryo);
|
||||||
}
|
}
|
||||||
@ -504,7 +488,23 @@ class KryoCryptoSerializationManager implements CryptoSerializationManager {
|
|||||||
Object read(final ByteBuf buffer, final int length) throws IOException {
|
Object read(final ByteBuf buffer, final int length) throws IOException {
|
||||||
final KryoExtra kryo = kryoPool.take();
|
final KryoExtra kryo = kryoPool.take();
|
||||||
try {
|
try {
|
||||||
return kryo.read(null, buffer);
|
return kryo.read(buffer);
|
||||||
|
} finally {
|
||||||
|
kryoPool.put(kryo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits until a kryo is available to write, using CAS operations to prevent having to synchronize.
|
||||||
|
* <p>
|
||||||
|
* There is a small speed penalty if there were no kryo's available to use.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public final
|
||||||
|
void writeWithCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
|
||||||
|
final KryoExtra kryo = kryoPool.take();
|
||||||
|
try {
|
||||||
|
kryo.writeCrypto(connection, buffer, message, logger);
|
||||||
} finally {
|
} finally {
|
||||||
kryoPool.put(kryo);
|
kryoPool.put(kryo);
|
||||||
}
|
}
|
||||||
|
@ -35,13 +35,9 @@ import java.io.IOException;
|
|||||||
*/
|
*/
|
||||||
public
|
public
|
||||||
class KryoExtra extends Kryo {
|
class KryoExtra extends Kryo {
|
||||||
// this is the minimum size that LZ4 can compress. Anything smaller than this will result in an output LARGER than the input.
|
|
||||||
private static final int LZ4_COMPRESSION_MIN_SIZE = 32;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* bit masks
|
* bit masks
|
||||||
*/
|
*/
|
||||||
private static final byte compress = (byte) 1;
|
|
||||||
static final byte crypto = (byte) (1 << 1);
|
static final byte crypto = (byte) (1 << 1);
|
||||||
|
|
||||||
// snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
|
// snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%)
|
||||||
@ -88,11 +84,9 @@ class KryoExtra extends Kryo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public synchronized
|
public synchronized
|
||||||
void write(final ConnectionImpl connection, final ByteBuf buffer, final Object message) throws IOException {
|
void write(final ByteBuf buffer, final Object message) throws IOException {
|
||||||
// connection will ALWAYS be of type Connection or NULL.
|
// connection will always be NULL during connection initialization
|
||||||
// used by RMI/some serializers to determine which connection wrote this object
|
this.connection = null;
|
||||||
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
|
|
||||||
this.connection = connection;
|
|
||||||
|
|
||||||
// during INIT and handshake, we don't use connection encryption/compression
|
// during INIT and handshake, we don't use connection encryption/compression
|
||||||
// magic byte
|
// magic byte
|
||||||
@ -105,11 +99,9 @@ class KryoExtra extends Kryo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public synchronized
|
public synchronized
|
||||||
Object read(final ConnectionImpl connection, final ByteBuf buffer) throws IOException {
|
Object read(final ByteBuf buffer) throws IOException {
|
||||||
// connection will ALWAYS be of type IConnection or NULL.
|
// connection will always be NULL during connection initialization
|
||||||
// used by RMI/some serializers to determine which connection read this object
|
this.connection = null;
|
||||||
// NOTE: this is only valid in the context of this thread, which RMI stuff is accessed in -- so this is SAFE for RMI
|
|
||||||
this.connection = connection;
|
|
||||||
|
|
||||||
|
|
||||||
////////////////
|
////////////////
|
||||||
@ -131,7 +123,6 @@ class KryoExtra extends Kryo {
|
|||||||
public synchronized
|
public synchronized
|
||||||
void writeCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final Logger logger) throws IOException {
|
void writeCrypto(final ConnectionImpl connection, final ByteBuf buffer, final Object message, final Logger logger) throws IOException {
|
||||||
final boolean traceEnabled = logger.isTraceEnabled();
|
final boolean traceEnabled = logger.isTraceEnabled();
|
||||||
byte magicByte = (byte) 0x00000000;
|
|
||||||
|
|
||||||
ByteBuf objectOutputBuffer = this.tempBuffer;
|
ByteBuf objectOutputBuffer = this.tempBuffer;
|
||||||
objectOutputBuffer.clear(); // always have to reset everything
|
objectOutputBuffer.clear(); // always have to reset everything
|
||||||
@ -176,7 +167,9 @@ class KryoExtra extends Kryo {
|
|||||||
inputOffset = 0;
|
inputOffset = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (length > LZ4_COMPRESSION_MIN_SIZE) {
|
// we AWALYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger
|
||||||
|
// output), will be negated by the increase in size by the encryption
|
||||||
|
|
||||||
if (traceEnabled) {
|
if (traceEnabled) {
|
||||||
logger.trace("Compressing data {}", connection);
|
logger.trace("Compressing data {}", connection);
|
||||||
}
|
}
|
||||||
@ -200,13 +193,9 @@ class KryoExtra extends Kryo {
|
|||||||
int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, 4, maxCompressedLength);
|
int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, 4, maxCompressedLength);
|
||||||
|
|
||||||
// bytes can now be written to, because our compressed data is stored in a temp array.
|
// bytes can now be written to, because our compressed data is stored in a temp array.
|
||||||
// ONLY do this if our compressed length is LESS than our uncompressed length.
|
|
||||||
if (compressedLength < length) {
|
|
||||||
// now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST
|
|
||||||
// decompress version
|
|
||||||
BigEndian.Int_.toBytes(length, compressOutput);
|
|
||||||
|
|
||||||
magicByte |= compress;
|
// now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST decompress version
|
||||||
|
BigEndian.Int_.toBytes(length, compressOutput);
|
||||||
|
|
||||||
// corrected length
|
// corrected length
|
||||||
length = compressedLength + 4; // +4 for the uncompressed size bytes
|
length = compressedLength + 4; // +4 for the uncompressed size bytes
|
||||||
@ -214,8 +203,6 @@ class KryoExtra extends Kryo {
|
|||||||
// correct input. compression output is now encryption input
|
// correct input. compression output is now encryption input
|
||||||
inputArray = compressOutput;
|
inputArray = compressOutput;
|
||||||
inputOffset = 0;
|
inputOffset = 0;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (traceEnabled) {
|
if (traceEnabled) {
|
||||||
logger.trace("AES encrypting data {}", connection);
|
logger.trace("AES encrypting data {}", connection);
|
||||||
@ -251,10 +238,8 @@ class KryoExtra extends Kryo {
|
|||||||
throw new IOException("Unable to AES encrypt the data", e);
|
throw new IOException("Unable to AES encrypt the data", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
magicByte |= crypto;
|
|
||||||
|
|
||||||
// write out the "magic" byte.
|
// write out the "magic" byte.
|
||||||
buffer.writeByte(magicByte);
|
buffer.writeByte(crypto);
|
||||||
|
|
||||||
// have to copy over the orig data, because we used the temp buffer
|
// have to copy over the orig data, because we used the temp buffer
|
||||||
buffer.writeBytes(cryptoOutput, 0, encryptedLength);
|
buffer.writeBytes(cryptoOutput, 0, encryptedLength);
|
||||||
@ -282,10 +267,6 @@ class KryoExtra extends Kryo {
|
|||||||
// have to adjust for the magic byte
|
// have to adjust for the magic byte
|
||||||
length -= 1;
|
length -= 1;
|
||||||
|
|
||||||
// it's ALWAYS encrypted, but MAYBE compressed
|
|
||||||
final boolean isCompressed = (magicByte & compress) == compress;
|
|
||||||
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("AES decrypting data {}", connection);
|
logger.trace("AES decrypting data {}", connection);
|
||||||
}
|
}
|
||||||
@ -350,7 +331,7 @@ class KryoExtra extends Kryo {
|
|||||||
throw new IOException("Unable to AES decrypt the data", e);
|
throw new IOException("Unable to AES decrypt the data", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isCompressed) {
|
// decompress -- as it's ALWAYS compressed
|
||||||
inputArray = decryptOutputArray;
|
inputArray = decryptOutputArray;
|
||||||
inputOffset = 4; // because 4 bytes for the decompressed size
|
inputOffset = 4; // because 4 bytes for the decompressed size
|
||||||
|
|
||||||
@ -371,11 +352,6 @@ class KryoExtra extends Kryo {
|
|||||||
|
|
||||||
decompressBuf.setIndex(0, uncompressedLength);
|
decompressBuf.setIndex(0, uncompressedLength);
|
||||||
inputBuf = decompressBuf;
|
inputBuf = decompressBuf;
|
||||||
}
|
|
||||||
else {
|
|
||||||
decryptBuf.setIndex(0, decryptedLength);
|
|
||||||
inputBuf = decryptBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
// read the object from the buffer.
|
// read the object from the buffer.
|
||||||
reader.setBuffer(inputBuf);
|
reader.setBuffer(inputBuf);
|
||||||
|
Loading…
Reference in New Issue
Block a user