Added logger to debug kryo read/write. Fixed compression buffer.read calculation error
This commit is contained in:
parent
a595ac44d1
commit
050fa81aff
|
@ -22,6 +22,8 @@ import javax.crypto.Cipher;
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
import javax.crypto.spec.GCMParameterSpec;
|
import javax.crypto.spec.GCMParameterSpec;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import com.esotericsoftware.kryo.Kryo;
|
import com.esotericsoftware.kryo.Kryo;
|
||||||
import com.esotericsoftware.kryo.io.Input;
|
import com.esotericsoftware.kryo.io.Input;
|
||||||
import com.esotericsoftware.kryo.io.Output;
|
import com.esotericsoftware.kryo.io.Output;
|
||||||
|
@ -135,7 +137,6 @@ class KryoExtra extends Kryo {
|
||||||
* ++++++++++++++++++++++++++
|
* ++++++++++++++++++++++++++
|
||||||
* + class and object bytes +
|
* + class and object bytes +
|
||||||
* ++++++++++++++++++++++++++
|
* ++++++++++++++++++++++++++
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public synchronized
|
public synchronized
|
||||||
Object read(final Connection_ connection, final ByteBuf buffer) throws IOException {
|
Object read(final Connection_ connection, final ByteBuf buffer) throws IOException {
|
||||||
|
@ -189,8 +190,8 @@ class KryoExtra extends Kryo {
|
||||||
|
|
||||||
|
|
||||||
public synchronized
|
public synchronized
|
||||||
void writeCompressed(final ByteBuf buffer, final Object message) throws IOException {
|
void writeCompressed(final Logger logger, final ByteBuf buffer, final Object message) throws IOException {
|
||||||
writeCompressed(NOP_CONNECTION, buffer, message);
|
writeCompressed(logger, NOP_CONNECTION, buffer, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -205,7 +206,7 @@ class KryoExtra extends Kryo {
|
||||||
* ++++++++++++++++++++++++++
|
* ++++++++++++++++++++++++++
|
||||||
*/
|
*/
|
||||||
public synchronized
|
public synchronized
|
||||||
void writeCompressed(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
|
void writeCompressed(final Logger logger, final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
|
||||||
// write the object to a TEMP buffer! this will be compressed later
|
// write the object to a TEMP buffer! this will be compressed later
|
||||||
write(connection, writer, message);
|
write(connection, writer, message);
|
||||||
|
|
||||||
|
@ -224,9 +225,10 @@ class KryoExtra extends Kryo {
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length);
|
String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length);
|
||||||
String compressed = ByteBufUtil.hexDump(compressOutput, 0, compressedLength);
|
String compressed = ByteBufUtil.hexDump(compressOutput, 0, compressedLength);
|
||||||
System.err.println("ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
|
logger.error(OS.LINE_SEPARATOR +
|
||||||
OS.LINE_SEPARATOR +
|
"ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
|
||||||
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed);
|
OS.LINE_SEPARATOR +
|
||||||
|
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
|
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
|
||||||
|
@ -236,9 +238,9 @@ class KryoExtra extends Kryo {
|
||||||
buffer.writeBytes(compressOutput, 0, compressedLength);
|
buffer.writeBytes(compressOutput, 0, compressedLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
public
|
public synchronized
|
||||||
Object readCompressed(final ByteBuf buffer, int length) throws IOException {
|
Object readCompressed(final Logger logger, final ByteBuf buffer, int length) throws IOException {
|
||||||
return readCompressed(NOP_CONNECTION, buffer, length);
|
return readCompressed(logger, NOP_CONNECTION, buffer, length);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -252,8 +254,8 @@ class KryoExtra extends Kryo {
|
||||||
* + class and object bytes +
|
* + class and object bytes +
|
||||||
* ++++++++++++++++++++++++++
|
* ++++++++++++++++++++++++++
|
||||||
*/
|
*/
|
||||||
public
|
public synchronized
|
||||||
Object readCompressed(final Connection_ connection, final ByteBuf buffer, int length) throws IOException {
|
Object readCompressed(final Logger logger, final Connection_ connection, final ByteBuf buffer, int length) throws IOException {
|
||||||
////////////////
|
////////////////
|
||||||
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
|
// Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it!
|
||||||
////////////////
|
////////////////
|
||||||
|
@ -274,8 +276,7 @@ class KryoExtra extends Kryo {
|
||||||
|
|
||||||
|
|
||||||
///////// decompress data
|
///////// decompress data
|
||||||
buffer.getBytes(start, temp, 0, length);
|
buffer.readBytes(temp, 0, length);
|
||||||
buffer.readerIndex(length);
|
|
||||||
|
|
||||||
|
|
||||||
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor)
|
// LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor)
|
||||||
|
@ -286,18 +287,14 @@ class KryoExtra extends Kryo {
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
String compressed = ByteBufUtil.hexDump(buffer, start, length);
|
String compressed = ByteBufUtil.hexDump(buffer, start, length);
|
||||||
String orig = ByteBufUtil.hexDump(reader.getBuffer(), start, uncompressedLength);
|
String orig = ByteBufUtil.hexDump(reader.getBuffer(), start, uncompressedLength);
|
||||||
System.err.println("COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed +
|
logger.error(OS.LINE_SEPARATOR +
|
||||||
OS.LINE_SEPARATOR +
|
"COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed +
|
||||||
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig);
|
OS.LINE_SEPARATOR +
|
||||||
|
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig);
|
||||||
}
|
}
|
||||||
|
|
||||||
// read the object from the buffer.
|
// read the object from the buffer.
|
||||||
Object classAndObject = read(connection, reader);
|
return read(connection, reader);
|
||||||
|
|
||||||
if (DEBUG) {
|
|
||||||
System.err.println("Read compress object" + classAndObject.getClass().getSimpleName());
|
|
||||||
}
|
|
||||||
return classAndObject;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -317,7 +314,7 @@ class KryoExtra extends Kryo {
|
||||||
* ++++++++++++++++++++++++++
|
* ++++++++++++++++++++++++++
|
||||||
*/
|
*/
|
||||||
public synchronized
|
public synchronized
|
||||||
void writeCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
|
void writeCrypto(final Logger logger, final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException {
|
||||||
// write the object to a TEMP buffer! this will be compressed later
|
// write the object to a TEMP buffer! this will be compressed later
|
||||||
write(connection, writer, message);
|
write(connection, writer, message);
|
||||||
|
|
||||||
|
@ -336,9 +333,10 @@ class KryoExtra extends Kryo {
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length);
|
String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length);
|
||||||
String compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength);
|
String compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength);
|
||||||
System.err.println("ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
|
logger.error(OS.LINE_SEPARATOR +
|
||||||
OS.LINE_SEPARATOR +
|
"ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig +
|
||||||
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed);
|
OS.LINE_SEPARATOR +
|
||||||
|
"COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
|
// now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version
|
||||||
|
@ -384,9 +382,10 @@ class KryoExtra extends Kryo {
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE);
|
String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE);
|
||||||
String crypto = ByteBufUtil.hexDump(writer.getBuffer(), 0, encryptedLength);
|
String crypto = ByteBufUtil.hexDump(writer.getBuffer(), 0, encryptedLength);
|
||||||
System.err.println("IV: (12)" + OS.LINE_SEPARATOR + ivString +
|
logger.error(OS.LINE_SEPARATOR +
|
||||||
OS.LINE_SEPARATOR +
|
"IV: (12)" + OS.LINE_SEPARATOR + ivString +
|
||||||
"CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto);
|
OS.LINE_SEPARATOR +
|
||||||
|
"CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -408,7 +407,7 @@ class KryoExtra extends Kryo {
|
||||||
* ++++++++++++++++++++++++++
|
* ++++++++++++++++++++++++++
|
||||||
*/
|
*/
|
||||||
public
|
public
|
||||||
Object readCrypto(final Connection_ connection, final ByteBuf buffer, int length) throws IOException {
|
Object readCrypto(final Logger logger, final Connection_ connection, final ByteBuf buffer, int length) throws IOException {
|
||||||
// read out the crypto IV
|
// read out the crypto IV
|
||||||
final byte[] iv = new byte[IV_LENGTH_BYTE];
|
final byte[] iv = new byte[IV_LENGTH_BYTE];
|
||||||
buffer.readBytes(iv, 0 , IV_LENGTH_BYTE);
|
buffer.readBytes(iv, 0 , IV_LENGTH_BYTE);
|
||||||
|
@ -431,8 +430,8 @@ class KryoExtra extends Kryo {
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE);
|
String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE);
|
||||||
String crypto = ByteBufUtil.hexDump(reader.getBuffer(), 0, length);
|
String crypto = ByteBufUtil.hexDump(reader.getBuffer(), 0, length);
|
||||||
System.err.println("IV: (12)" + OS.LINE_SEPARATOR + ivString +
|
logger.error("IV: (12)" + OS.LINE_SEPARATOR + ivString +
|
||||||
OS.LINE_SEPARATOR + "CRYPTO: (" + length + ")" + OS.LINE_SEPARATOR + crypto);
|
OS.LINE_SEPARATOR + "CRYPTO: (" + length + ")" + OS.LINE_SEPARATOR + crypto);
|
||||||
}
|
}
|
||||||
|
|
||||||
int decryptedLength;
|
int decryptedLength;
|
||||||
|
@ -459,9 +458,9 @@ class KryoExtra extends Kryo {
|
||||||
int endWithoutUncompressedLength = decryptedLength - start;
|
int endWithoutUncompressedLength = decryptedLength - start;
|
||||||
String compressed = ByteBufUtil.hexDump(temp, start, endWithoutUncompressedLength);
|
String compressed = ByteBufUtil.hexDump(temp, start, endWithoutUncompressedLength);
|
||||||
String orig = ByteBufUtil.hexDump(reader.getBuffer(), 0, uncompressedLength);
|
String orig = ByteBufUtil.hexDump(reader.getBuffer(), 0, uncompressedLength);
|
||||||
System.err.println("COMPRESSED: (" + endWithoutUncompressedLength + ")" + OS.LINE_SEPARATOR + compressed +
|
logger.error("COMPRESSED: (" + endWithoutUncompressedLength + ")" + OS.LINE_SEPARATOR + compressed +
|
||||||
OS.LINE_SEPARATOR +
|
OS.LINE_SEPARATOR +
|
||||||
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig);
|
"ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig);
|
||||||
}
|
}
|
||||||
|
|
||||||
// read the object from the buffer.
|
// read the object from the buffer.
|
||||||
|
|
|
@ -19,6 +19,7 @@ import java.io.IOException;
|
||||||
import java.lang.reflect.InvocationHandler;
|
import java.lang.reflect.InvocationHandler;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
|
import org.bouncycastle.crypto.params.ECPrivateKeyParameters;
|
||||||
|
@ -133,6 +134,18 @@ class Serialization implements NetworkSerializationManager {
|
||||||
|
|
||||||
serialization.register(Arrays.asList().getClass());
|
serialization.register(Arrays.asList().getClass());
|
||||||
|
|
||||||
|
serialization.register(Collections.emptyList().getClass());
|
||||||
|
|
||||||
|
serialization.register(Collections.emptySet().getClass());
|
||||||
|
serialization.register(Collections.emptyNavigableSet().getClass());
|
||||||
|
serialization.register(Collections.emptySortedSet().getClass());
|
||||||
|
|
||||||
|
serialization.register(Collections.emptyMap().getClass());
|
||||||
|
serialization.register(Collections.emptyNavigableMap().getClass());
|
||||||
|
serialization.register(Collections.emptySortedMap().getClass());
|
||||||
|
|
||||||
|
// java.util.Collections$EmptyList
|
||||||
|
|
||||||
// hacky way to register unmodifiable serializers
|
// hacky way to register unmodifiable serializers
|
||||||
Kryo kryo = new Kryo() {
|
Kryo kryo = new Kryo() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -508,7 +521,7 @@ class Serialization implements NetworkSerializationManager {
|
||||||
|
|
||||||
kryo.setRegistrationRequired(false);
|
kryo.setRegistrationRequired(false);
|
||||||
try {
|
try {
|
||||||
kryo.writeCompressed(buffer, registrationDetails);
|
kryo.writeCompressed(logger, buffer, registrationDetails);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Unable to write compressed data for registration details");
|
logger.error("Unable to write compressed data for registration details");
|
||||||
}
|
}
|
||||||
|
@ -568,7 +581,7 @@ class Serialization implements NetworkSerializationManager {
|
||||||
try {
|
try {
|
||||||
kryo.setRegistrationRequired(false);
|
kryo.setRegistrationRequired(false);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Object[][] classRegistrations = (Object[][]) kryo.readCompressed(byteBuf, otherRegistrationData.length);
|
Object[][] classRegistrations = (Object[][]) kryo.readCompressed(logger, byteBuf, otherRegistrationData.length);
|
||||||
|
|
||||||
|
|
||||||
int lengthOrg = mergedRegistrations.length;
|
int lengthOrg = mergedRegistrations.length;
|
||||||
|
@ -785,13 +798,13 @@ class Serialization implements NetworkSerializationManager {
|
||||||
try {
|
try {
|
||||||
if (wireWriteLogger.isTraceEnabled()) {
|
if (wireWriteLogger.isTraceEnabled()) {
|
||||||
int start = buffer.writerIndex();
|
int start = buffer.writerIndex();
|
||||||
kryo.writeCompressed(connection, buffer, message);
|
kryo.writeCompressed(wireWriteLogger, connection, buffer, message);
|
||||||
int end = buffer.writerIndex();
|
int end = buffer.writerIndex();
|
||||||
|
|
||||||
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
kryo.writeCompressed(connection, buffer, message);
|
kryo.writeCompressed(wireWriteLogger, connection, buffer, message);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
kryoPool.put(kryo);
|
kryoPool.put(kryo);
|
||||||
|
@ -863,7 +876,7 @@ class Serialization implements NetworkSerializationManager {
|
||||||
try {
|
try {
|
||||||
if (wireReadLogger.isTraceEnabled()) {
|
if (wireReadLogger.isTraceEnabled()) {
|
||||||
int start = buffer.readerIndex();
|
int start = buffer.readerIndex();
|
||||||
Object object = kryo.readCompressed(connection, buffer, length);
|
Object object = kryo.readCompressed(wireReadLogger, connection, buffer, length);
|
||||||
int end = buffer.readerIndex();
|
int end = buffer.readerIndex();
|
||||||
|
|
||||||
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
||||||
|
@ -871,7 +884,7 @@ class Serialization implements NetworkSerializationManager {
|
||||||
return object;
|
return object;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return kryo.readCompressed(connection, buffer, length);
|
return kryo.readCompressed(wireReadLogger, connection, buffer, length);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
kryoPool.put(kryo);
|
kryoPool.put(kryo);
|
||||||
|
@ -890,13 +903,13 @@ class Serialization implements NetworkSerializationManager {
|
||||||
try {
|
try {
|
||||||
if (wireWriteLogger.isTraceEnabled()) {
|
if (wireWriteLogger.isTraceEnabled()) {
|
||||||
int start = buffer.writerIndex();
|
int start = buffer.writerIndex();
|
||||||
kryo.writeCrypto(connection, buffer, message);
|
kryo.writeCrypto(wireWriteLogger, connection, buffer, message);
|
||||||
int end = buffer.writerIndex();
|
int end = buffer.writerIndex();
|
||||||
|
|
||||||
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
kryo.writeCrypto(connection, buffer, message);
|
kryo.writeCrypto(wireWriteLogger, connection, buffer, message);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
kryoPool.put(kryo);
|
kryoPool.put(kryo);
|
||||||
|
@ -917,7 +930,7 @@ class Serialization implements NetworkSerializationManager {
|
||||||
try {
|
try {
|
||||||
if (wireReadLogger.isTraceEnabled()) {
|
if (wireReadLogger.isTraceEnabled()) {
|
||||||
int start = buffer.readerIndex();
|
int start = buffer.readerIndex();
|
||||||
Object object = kryo.readCrypto(connection, buffer, length);
|
Object object = kryo.readCrypto(wireReadLogger, connection, buffer, length);
|
||||||
int end = buffer.readerIndex();
|
int end = buffer.readerIndex();
|
||||||
|
|
||||||
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start));
|
||||||
|
@ -925,7 +938,7 @@ class Serialization implements NetworkSerializationManager {
|
||||||
return object;
|
return object;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return kryo.readCrypto(connection, buffer, length);
|
return kryo.readCrypto(wireReadLogger, connection, buffer, length);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
kryoPool.put(kryo);
|
kryoPool.put(kryo);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user