diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index 7ad39b22..64ea0206 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.SecretKey; import dorkbox.network.Client; import dorkbox.network.connection.bridge.ConnectionBridge; @@ -192,26 +192,24 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_ } /** - * @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal - * because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't - * clobber each other + * @return the AES key. key=32 byte, iv=12 bytes (AES-GCM implementation). */ @Override public final - ParametersWithIV getCryptoParameters() { - return this.channelWrapper.cryptoParameters(); + SecretKey cryptoKey() { + return this.channelWrapper.cryptoKey(); } /** * This is the per-message sequence number. * - * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) + * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter) * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) */ @Override public final - long getNextGcmSequence() { + long nextGcmSequence() { return aes_gcm_iv.getAndIncrement(); } diff --git a/src/dorkbox/network/connection/Connection_.java b/src/dorkbox/network/connection/Connection_.java index d643475e..809aaf78 100644 --- a/src/dorkbox/network/connection/Connection_.java +++ b/src/dorkbox/network/connection/Connection_.java @@ -15,7 +15,7 @@ */ package dorkbox.network.connection; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.SecretKey; import dorkbox.network.rmi.ConnectionRmiSupport; @@ -33,19 +33,17 @@ interface Connection_ extends Connection { /** * This is the per-message sequence number. * - * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 8 (external counter) + 4 (GCM counter) + * The IV for AES-GCM must be 12 bytes, since it's 4 (salt) + 4 (external counter) + 4 (GCM counter) * The 12 bytes IV is created during connection registration, and during the AES-GCM crypto, we override the last 8 with this * counter, which is also transmitted as an optimized int. (which is why it starts at 0, so the transmitted bytes are small) */ - long getNextGcmSequence(); + long nextGcmSequence(); /** - * @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal - * because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't - * clobber each other + * @return the AES key. */ - ParametersWithIV getCryptoParameters(); + SecretKey cryptoKey(); /** * @return the endpoint associated with this connection diff --git a/src/dorkbox/network/connection/KryoExtra.java b/src/dorkbox/network/connection/KryoExtra.java index eddc245f..acb61b31 100644 --- a/src/dorkbox/network/connection/KryoExtra.java +++ b/src/dorkbox/network/connection/KryoExtra.java @@ -16,23 +16,26 @@ package dorkbox.network.connection; import java.io.IOException; +import java.security.SecureRandom; -import org.bouncycastle.crypto.engines.AESFastEngine; -import org.bouncycastle.crypto.modes.GCMBlockCipher; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import javax.crypto.spec.GCMParameterSpec; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import dorkbox.network.pipeline.ByteBufInput; import dorkbox.network.pipeline.ByteBufOutput; import dorkbox.network.rmi.ConnectionRmiSupport; import dorkbox.network.rmi.RmiNopConnection; import dorkbox.network.serialization.NetworkSerializationManager; -import dorkbox.util.bytes.BigEndian; +import dorkbox.util.OS; import dorkbox.util.bytes.OptimizeUtilsByteArray; import dorkbox.util.bytes.OptimizeUtilsByteBuf; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBufUtil; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4FastDecompressor; @@ -43,6 +46,9 @@ import net.jpountz.lz4.LZ4FastDecompressor; @SuppressWarnings("Duplicates") public class KryoExtra extends Kryo { + private static final int ABSOLUTE_MAX_SIZE_OBJECT = EndPoint.udpMaxSize * 1000; // by default, this is about 500k + private static final boolean DEBUG = false; + private static final Connection_ NOP_CONNECTION = new RmiNopConnection(); // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) @@ -52,39 +58,31 @@ class KryoExtra extends Kryo { private static final LZ4Factory factory = LZ4Factory.fastestInstance(); // for kryo serialization - private final ByteBufInput reader = new ByteBufInput(); - private final ByteBufOutput writer = new ByteBufOutput(); + private final ByteBufInput readerBuff = new ByteBufInput(); + private final ByteBufOutput writerBuff = new ByteBufOutput(); + + // crypto + compression have to work with native byte arrays, so here we go... + private final Input reader = new Input(ABSOLUTE_MAX_SIZE_OBJECT); + private final Output writer = new Output(ABSOLUTE_MAX_SIZE_OBJECT); + + private final byte[] temp = new byte[ABSOLUTE_MAX_SIZE_OBJECT]; + // volatile to provide object visibility for entire class. This is unique per connection public volatile ConnectionRmiSupport rmiSupport; - private final GCMBlockCipher aesEngine = new GCMBlockCipher(new AESFastEngine()); + private static final String ALGORITHM = "AES/GCM/NoPadding"; + private static final int TAG_LENGTH_BIT = 128; + private static final int IV_LENGTH_BYTE = 12; + private final SecureRandom secureRandom = new SecureRandom(); + private final Cipher cipher; - // writing data - private final ByteBuf tempBuffer = Unpooled.buffer(EndPoint.udpMaxSize); private LZ4Compressor compressor = factory.fastCompressor(); - - private int inputArrayLength = -1; - private byte[] inputArray; - - private int compressOutputLength = -1; - private byte[] compressOutput; - - private int cryptoOutputLength = -1; - private byte[] cryptoOutput; - - - // reading data private LZ4FastDecompressor decompressor = factory.fastDecompressor(); - private int decryptOutputLength = -1; - private byte[] decryptOutput; - private ByteBuf decryptBuf; - private int decompressOutputLength = -1; - private byte[] decompressOutput; - private ByteBuf decompressBuf; + private NetworkSerializationManager serializationManager; @@ -93,499 +91,393 @@ class KryoExtra extends Kryo { super(); this.serializationManager = serializationManager; + + try { + cipher = Cipher.getInstance(ALGORITHM); + } catch (Exception e) { + throw new IllegalStateException("could not get cipher instance", e); + } } + /** + * This is NOT ENCRYPTED + */ public synchronized void write(final ByteBuf buffer, final Object message) throws IOException { - // these will always be NULL during connection initialization - this.rmiSupport = null; + write(NOP_CONNECTION, buffer, message); + } + + /** + * This is NOT ENCRYPTED + */ + public synchronized + void write(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.rmiSupport = connection.rmiSupport(); // write the object to the NORMAL output buffer! - writer.setBuffer(buffer); + writerBuff.setBuffer(buffer); + + writeClassAndObject(writerBuff, message); + } + + /** + * This is NOT ENCRYPTED + * + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + public synchronized + Object read(final ByteBuf buffer) throws IOException { + return read(NOP_CONNECTION, buffer); + } + + /** + * This is NOT ENCRYPTED + * + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + * + */ + public synchronized + Object read(final Connection_ connection, final ByteBuf buffer) throws IOException { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.rmiSupport = connection.rmiSupport(); + + // read the object from the buffer. + readerBuff.setBuffer(buffer); + + return readClassAndObject(readerBuff); // this properly sets the readerIndex, but only if it's the correct buffer + } + + //////////////// + //////////////// + //////////////// + // for more complicated writes, sadly, we have to deal DIRECTLY with byte arrays + //////////////// + //////////////// + //////////////// + + /** + * OUTPUT: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + private + void write(final Connection_ connection, final Output writer, final Object message) { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.rmiSupport = connection.rmiSupport(); + + // write the object to the NORMAL output buffer! + writer.reset(); writeClassAndObject(writer, message); } - public synchronized - Object read(final ByteBuf buffer) throws IOException { - // these will always be NULL during connection initialization - this.rmiSupport = null; + /** + * INPUT: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ + private + Object read(final Connection_ connection, final Input reader) { + // required by RMI and some serializers to determine which connection wrote (or has info about) this object + this.rmiSupport = connection.rmiSupport(); - //////////////// - // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! - //////////////// - - ByteBuf inputBuf = buffer; - - // read the object from the buffer. - reader.setBuffer(inputBuf); - - return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer + return readClassAndObject(reader); } - /** - * This is NOT ENCRYPTED (and is only done on the loopback connection!) - */ + public synchronized void writeCompressed(final ByteBuf buffer, final Object message) throws IOException { writeCompressed(NOP_CONNECTION, buffer, message); } /** - * This is NOT ENCRYPTED (and is only done on the loopback connection!) + * BUFFER: + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + uncompressed length (1-4 bytes) + compressed data + + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * COMPRESSED DATA: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ */ public synchronized void writeCompressed(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { - // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.rmiSupport = connection.rmiSupport(); + // write the object to a TEMP buffer! this will be compressed later + write(connection, writer, message); - ByteBuf objectOutputBuffer = this.tempBuffer; - objectOutputBuffer.clear(); // always have to reset everything - - // write the object to a TEMP buffer! this will be compressed - writer.setBuffer(objectOutputBuffer); - - writeClassAndObject(writer, message); - - // save off how much data the object took + magic byte - int length = objectOutputBuffer.writerIndex(); - - // NOTE: compression and encryption MUST work with byte[] because they use JNI! - // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use - // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... - // see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf - - byte[] inputArray; - int inputOffset; - - // Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because - // the buffer might be a slice of other buffer or a pooled buffer: - //noinspection Duplicates - if (objectOutputBuffer.hasArray() && - objectOutputBuffer.array()[0] == objectOutputBuffer.getByte(0) && - objectOutputBuffer.array().length == objectOutputBuffer.capacity()) { - - // we can use it... - inputArray = objectOutputBuffer.array(); - inputArrayLength = -1; // this is so we don't REUSE this array accidentally! - inputOffset = objectOutputBuffer.arrayOffset(); - } - else { - // we can NOT use it. - if (length > inputArrayLength) { - inputArrayLength = length; - inputArray = new byte[length]; - this.inputArray = inputArray; - } - else { - inputArray = this.inputArray; - } - - objectOutputBuffer.getBytes(objectOutputBuffer.readerIndex(), inputArray, 0, length); - inputOffset = 0; - } + // save off how much data the object took + int length = writer.position(); + int maxCompressedLength = compressor.maxCompressedLength(length); ////////// compressing data // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger // output), will be negated by the increase in size by the encryption + byte[] compressOutput = temp; - byte[] compressOutput = this.compressOutput; + // LZ4 compress. + int compressedLength = compressor.compress(writer.getBuffer(), 0, length, compressOutput, 0, maxCompressedLength); - int maxLengthLengthOffset = 4; // length is never negative, so 4 is OK (5 means it's negative) - int maxCompressedLength = compressor.maxCompressedLength(length); - - // add 4 so there is room to write the compressed size to the buffer - int maxCompressedLengthWithOffset = maxCompressedLength + maxLengthLengthOffset; - - // lazy initialize the compression output buffer - if (maxCompressedLengthWithOffset > compressOutputLength) { - compressOutputLength = maxCompressedLengthWithOffset; - compressOutput = new byte[maxCompressedLengthWithOffset]; - this.compressOutput = compressOutput; + if (DEBUG) { + String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length); + String compressed = ByteBufUtil.hexDump(compressOutput, 0, compressedLength); + System.err.println("ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + + OS.LINE_SEPARATOR + + "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed); } - - // LZ4 compress. output offset max 4 bytes to leave room for length of tempOutput data - int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, maxLengthLengthOffset, maxCompressedLength); - - // bytes can now be written to, because our compressed data is stored in a temp array. - - final int lengthLength = OptimizeUtilsByteArray.intLength(length, true); - - // correct input. compression output is now buffer input - inputArray = compressOutput; - inputOffset = maxLengthLengthOffset - lengthLength; - - - // now write the ORIGINAL (uncompressed) length to the front of the byte array (this is NOT THE BUFFER!). This is so we can use the FAST decompress version - OptimizeUtilsByteArray.writeInt(inputArray, length, true, inputOffset); + // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version + OptimizeUtilsByteBuf.writeInt(buffer, length, true); // have to copy over the orig data, because we used the temp buffer. Also have to account for the length of the uncompressed size - buffer.writeBytes(inputArray, inputOffset, compressedLength + lengthLength); + buffer.writeBytes(compressOutput, 0, compressedLength); } - /** - * This is NOT ENCRYPTED (and is only done on the loopback connection!) - */ public Object readCompressed(final ByteBuf buffer, int length) throws IOException { return readCompressed(NOP_CONNECTION, buffer, length); } /** - * This is NOT ENCRYPTED (and is only done on the loopback connection!) + * BUFFER: + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + uncompressed length (1-4 bytes) + compressed data + + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * COMPRESSED DATA: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ */ public Object readCompressed(final Connection_ connection, final ByteBuf buffer, int length) throws IOException { - // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.rmiSupport = connection.rmiSupport(); - - //////////////// // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! //////////////// - ByteBuf inputBuf = buffer; - // get the decompressed length (at the beginning of the array) final int uncompressedLength = OptimizeUtilsByteBuf.readInt(buffer, true); - final int lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-5 bytes for the decompressed size + if (uncompressedLength > ABSOLUTE_MAX_SIZE_OBJECT) { + throw new IOException("Uncompressed size (" + uncompressedLength + ") is larger than max allowed size (" + ABSOLUTE_MAX_SIZE_OBJECT + ")!"); + } - // have to adjust for uncompressed length + // because 1-4 bytes for the decompressed size (this number is never negative) + final int lengthLength = OptimizeUtilsByteArray.intLength(uncompressedLength, true); + + int start = buffer.readerIndex(); + + // have to adjust for uncompressed length-length length = length - lengthLength; - ///////// decompress data -- as it's ALWAYS compressed + ///////// decompress data + buffer.getBytes(start, temp, 0, length); + buffer.readerIndex(length); - // NOTE: compression and encryption MUST work with byte[] because they use JNI! - // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use - // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... - // see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf - - byte[] inputArray; - int inputOffset; - - // Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because - // the buffer might be a slice of other buffer or a pooled buffer: - //noinspection Duplicates - if (inputBuf.hasArray() && - inputBuf.array()[0] == inputBuf.getByte(0) && - inputBuf.array().length == inputBuf.capacity()) { - - // we can use it... - inputArray = inputBuf.array(); - inputArrayLength = -1; // this is so we don't REUSE this array accidentally! - inputOffset = inputBuf.arrayOffset() + lengthLength; - } - else { - // we can NOT use it. - if (length > inputArrayLength) { - inputArrayLength = length; - inputArray = new byte[length]; - this.inputArray = inputArray; - } - else { - inputArray = this.inputArray; - } - - inputBuf.getBytes(inputBuf.readerIndex(), inputArray, 0, length); - inputOffset = 0; - } - - // have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index. - buffer.readerIndex(buffer.readerIndex() + length); - - - ///////// decompress data -- as it's ALWAYS compressed - - byte[] decompressOutputArray = this.decompressOutput; - if (uncompressedLength > decompressOutputLength) { - decompressOutputLength = uncompressedLength; - decompressOutputArray = new byte[uncompressedLength]; - this.decompressOutput = decompressOutputArray; - - decompressBuf = Unpooled.wrappedBuffer(decompressOutputArray); // so we can read via kryo - } - inputBuf = decompressBuf; // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor) - decompressor.decompress(inputArray, inputOffset, decompressOutputArray, 0, uncompressedLength); - - inputBuf.setIndex(0, uncompressedLength); + reader.reset(); + decompressor.decompress(temp, 0, reader.getBuffer(), 0, uncompressedLength); + reader.setLimit(uncompressedLength); + if (DEBUG) { + String compressed = ByteBufUtil.hexDump(buffer, start, length); + String orig = ByteBufUtil.hexDump(reader.getBuffer(), start, uncompressedLength); + System.err.println("COMPRESSED: (" + length + ")" + OS.LINE_SEPARATOR + compressed + + OS.LINE_SEPARATOR + + "ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig); + } // read the object from the buffer. - reader.setBuffer(inputBuf); + Object classAndObject = read(connection, reader); - return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer + if (DEBUG) { + System.err.println("Read compress object" + classAndObject.getClass().getSimpleName()); + } + return classAndObject; } + /** + * BUFFER: + * +++++++++++++++++++++++++++++++ + * + IV (12) + encrypted data + + * +++++++++++++++++++++++++++++++ + * + * ENCRYPTED DATA: + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + uncompressed length (1-4 bytes) + compressed data + + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * COMPRESSED DATA: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ public synchronized void writeCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { - // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.rmiSupport = connection.rmiSupport(); - - ByteBuf objectOutputBuffer = this.tempBuffer; - objectOutputBuffer.clear(); // always have to reset everything - - // write the object to a TEMP buffer! this will be compressed - writer.setBuffer(objectOutputBuffer); - - writeClassAndObject(writer, message); + // write the object to a TEMP buffer! this will be compressed later + write(connection, writer, message); // save off how much data the object took - int length = objectOutputBuffer.writerIndex(); - - - // NOTE: compression and encryption MUST work with byte[] because they use JNI! - // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use - // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... - // see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf - - byte[] inputArray; - int inputOffset; - - // Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because - // the buffer might be a slice of other buffer or a pooled buffer: - //noinspection Duplicates - if (objectOutputBuffer.hasArray() && - objectOutputBuffer.array()[0] == objectOutputBuffer.getByte(0) && - objectOutputBuffer.array().length == objectOutputBuffer.capacity()) { - - // we can use it... - inputArray = objectOutputBuffer.array(); - inputArrayLength = -1; // this is so we don't REUSE this array accidentally! - inputOffset = objectOutputBuffer.arrayOffset(); - } - else { - // we can NOT use it. - if (length > inputArrayLength) { - inputArrayLength = length; - inputArray = new byte[length]; - this.inputArray = inputArray; - } - else { - inputArray = this.inputArray; - } - - objectOutputBuffer.getBytes(objectOutputBuffer.readerIndex(), inputArray, 0, length); - inputOffset = 0; - } - + int length = writer.position(); + int maxCompressedLength = compressor.maxCompressedLength(length); ////////// compressing data // we ALWAYS compress our data stream -- because of how AES-GCM pads data out, the small input (that would result in a larger // output), will be negated by the increase in size by the encryption + byte[] compressOutput = temp; - byte[] compressOutput = this.compressOutput; + // LZ4 compress. Offset by 4 in the dest array so we have room for the length + int compressedLength = compressor.compress(writer.getBuffer(), 0, length, compressOutput, 4, maxCompressedLength); - int maxLengthLengthOffset = 4; // length is never negative, so 4 is OK (5 means it's negative) - int maxCompressedLength = compressor.maxCompressedLength(length); - - // add 4 so there is room to write the compressed size to the buffer - int maxCompressedLengthWithOffset = maxCompressedLength + maxLengthLengthOffset; - - // lazy initialize the compression output buffer - if (maxCompressedLengthWithOffset > compressOutputLength) { - compressOutputLength = maxCompressedLengthWithOffset; - compressOutput = new byte[maxCompressedLengthWithOffset]; - this.compressOutput = compressOutput; + if (DEBUG) { + String orig = ByteBufUtil.hexDump(writer.getBuffer(), 0, length); + String compressed = ByteBufUtil.hexDump(compressOutput, 4, compressedLength); + System.err.println("ORIG: (" + length + ")" + OS.LINE_SEPARATOR + orig + + OS.LINE_SEPARATOR + + "COMPRESSED: (" + compressedLength + ")" + OS.LINE_SEPARATOR + compressed); } - - - // LZ4 compress. output offset max 4 bytes to leave room for length of tempOutput data - int compressedLength = compressor.compress(inputArray, inputOffset, length, compressOutput, maxLengthLengthOffset, maxCompressedLength); - - // bytes can now be written to, because our compressed data is stored in a temp array. - + // now write the ORIGINAL (uncompressed) length. This is so we can use the FAST decompress version final int lengthLength = OptimizeUtilsByteArray.intLength(length, true); - // correct input. compression output is now encryption input - inputArray = compressOutput; - inputOffset = maxLengthLengthOffset - lengthLength; + // this is where we start writing the length data, so that the end of this lines up with the compressed data + int start = 4 - lengthLength; + OptimizeUtilsByteArray.writeInt(compressOutput, length, true, start); - // now write the ORIGINAL (uncompressed) length to the front of the byte array. This is so we can use the FAST decompress version - OptimizeUtilsByteArray.writeInt(inputArray, length, true, inputOffset); - - // correct length for encryption - length = compressedLength + lengthLength; // +1 to +4 for the uncompressed size bytes - + // now compressOutput contains "uncompressed length + data" + int compressedArrayLength = lengthLength + compressedLength; /////// encrypting data. - final long nextGcmSequence = connection.getNextGcmSequence(); + final SecretKey cryptoKey = connection.cryptoKey(); - // this is a threadlocal, so that we don't clobber other threads that are performing crypto on the same connection at the same time - final ParametersWithIV cryptoParameters = connection.getCryptoParameters(); - BigEndian.Long_.toBytes(nextGcmSequence, cryptoParameters.getIV(), 4); // put our counter into the IV - - final GCMBlockCipher aes = this.aesEngine; - aes.reset(); - aes.init(true, cryptoParameters); - - byte[] cryptoOutput; - - // lazy initialize the crypto output buffer - int cryptoSize = length + 16; // from: aes.getOutputSize(length); - - // 'output' is the temp byte array - if (cryptoSize > cryptoOutputLength) { - cryptoOutputLength = cryptoSize; - cryptoOutput = new byte[cryptoSize]; - this.cryptoOutput = cryptoOutput; - } else { - cryptoOutput = this.cryptoOutput; - } - - int encryptedLength = aes.processBytes(inputArray, inputOffset, length, cryptoOutput, 0); + byte[] iv = new byte[IV_LENGTH_BYTE]; // NEVER REUSE THIS IV WITH SAME KEY + secureRandom.nextBytes(iv); + GCMParameterSpec parameterSpec = new GCMParameterSpec(TAG_LENGTH_BIT, iv); // 128 bit auth tag length try { - // authentication tag for GCM - encryptedLength += aes.doFinal(cryptoOutput, encryptedLength); + cipher.init(Cipher.ENCRYPT_MODE, cryptoKey, parameterSpec); } catch (Exception e) { throw new IOException("Unable to AES encrypt the data", e); } - // write out our GCM counter - OptimizeUtilsByteBuf.writeLong(buffer, nextGcmSequence, true); + // we REUSE the writer buffer! (since that data is now compressed in a different array) + + int encryptedLength; + try { + encryptedLength = cipher.doFinal(compressOutput, start, compressedArrayLength, writer.getBuffer(), 0); + } catch (Exception e) { + throw new IOException("Unable to AES encrypt the data", e); + } + + // write out our IV + buffer.writeBytes(iv, 0, IV_LENGTH_BYTE); // have to copy over the orig data, because we used the temp buffer - buffer.writeBytes(cryptoOutput, 0, encryptedLength); + buffer.writeBytes(writer.getBuffer(), 0, encryptedLength); + + if (DEBUG) { + String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE); + String crypto = ByteBufUtil.hexDump(writer.getBuffer(), 0, encryptedLength); + System.err.println("IV: (12)" + OS.LINE_SEPARATOR + ivString + + OS.LINE_SEPARATOR + + "CRYPTO: (" + encryptedLength + ")" + OS.LINE_SEPARATOR + crypto); + } } + + /** + * BUFFER: + * +++++++++++++++++++++++++++++++ + * + IV (12) + encrypted data + + * +++++++++++++++++++++++++++++++ + * + * ENCRYPTED DATA: + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + uncompressed length (1-4 bytes) + compressed data + + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * COMPRESSED DATA: + * ++++++++++++++++++++++++++ + * + class and object bytes + + * ++++++++++++++++++++++++++ + */ public Object readCrypto(final Connection_ connection, final ByteBuf buffer, int length) throws IOException { - // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.rmiSupport = connection.rmiSupport(); + // read out the crypto IV + final byte[] iv = new byte[IV_LENGTH_BYTE]; + buffer.readBytes(iv, 0 , IV_LENGTH_BYTE); - //////////////// - // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! - //////////////// + // have to adjust for the IV + length = length - IV_LENGTH_BYTE; - ByteBuf inputBuf = buffer; - - final long gcmIVCounter = OptimizeUtilsByteBuf.readLong(buffer, true); - int lengthLength = OptimizeUtilsByteArray.longLength(gcmIVCounter, true); - - - // have to adjust for the gcmIVCounter - length = length - lengthLength; - - - /////////// decrypting data - - // NOTE: compression and encryption MUST work with byte[] because they use JNI! - // Realistically, it is impossible to get the backing arrays out of a Heap Buffer once they are resized and begin to use - // sliced. It's lame that there is a "double copy" of bytes here, but I don't know how to avoid it... - // see: https://stackoverflow.com/questions/19296386/netty-java-getting-data-from-bytebuf - - byte[] inputArray; - int inputOffset; - - // Even if a ByteBuf has a backing array (i.e. buf.hasArray() returns true), the using it isn't always possible because - // the buffer might be a slice of other buffer or a pooled buffer: - //noinspection Duplicates - if (inputBuf.hasArray() && - inputBuf.array()[0] == inputBuf.getByte(0) && - inputBuf.array().length == inputBuf.capacity()) { - - // we can use it... - inputArray = inputBuf.array(); - inputArrayLength = -1; // this is so we don't REUSE this array accidentally! - inputOffset = inputBuf.arrayOffset() + lengthLength; - } - else { - // we can NOT use it. - if (length > inputArrayLength) { - inputArrayLength = length; - inputArray = new byte[length]; - this.inputArray = inputArray; - } - else { - inputArray = this.inputArray; - } - - inputBuf.getBytes(inputBuf.readerIndex(), inputArray, 0, length); - inputOffset = 0; - } - - - - // have to make sure to set the position of the buffer, since our conversion to array DOES NOT set the new reader index. - buffer.readerIndex(buffer.readerIndex() + length); - - // this is a threadlocal, so that we don't clobber other threads that are performing crypto on the same connection at the same time - final ParametersWithIV cryptoParameters = connection.getCryptoParameters(); - BigEndian.Long_.toBytes(gcmIVCounter, cryptoParameters.getIV(), 4); // put our counter into the IV - - final GCMBlockCipher aes = this.aesEngine; - aes.reset(); - aes.init(false, cryptoParameters); - - int cryptoSize = length - 16; // from: aes.getOutputSize(length); - - // lazy initialize the decrypt output buffer - byte[] decryptOutputArray; - if (cryptoSize > decryptOutputLength) { - decryptOutputLength = cryptoSize; - decryptOutputArray = new byte[cryptoSize]; - this.decryptOutput = decryptOutputArray; - - decryptBuf = Unpooled.wrappedBuffer(decryptOutputArray); - } else { - decryptOutputArray = this.decryptOutput; - } - - int decryptedLength = aes.processBytes(inputArray, inputOffset, length, decryptOutputArray, 0); + /////////// decrypt data + final SecretKey cryptoKey = connection.cryptoKey(); try { - // authentication tag for GCM - decryptedLength += aes.doFinal(decryptOutputArray, decryptedLength); + cipher.init(Cipher.DECRYPT_MODE, cryptoKey, new GCMParameterSpec(TAG_LENGTH_BIT, iv)); } catch (Exception e) { throw new IOException("Unable to AES decrypt the data", e); } + // have to copy out bytes, we reuse the reader byte array! + buffer.readBytes(reader.getBuffer(), 0, length); + + if (DEBUG) { + String ivString = ByteBufUtil.hexDump(iv, 0, IV_LENGTH_BYTE); + String crypto = ByteBufUtil.hexDump(reader.getBuffer(), 0, length); + System.err.println("IV: (12)" + OS.LINE_SEPARATOR + ivString + + OS.LINE_SEPARATOR + "CRYPTO: (" + length + ")" + OS.LINE_SEPARATOR + crypto); + } + + int decryptedLength; + try { + decryptedLength = cipher.doFinal(reader.getBuffer(),0, length, temp, 0); + } catch (Exception e) { + throw new IOException("Unable to AES decrypt the data", e); + } + ///////// decompress data -- as it's ALWAYS compressed // get the decompressed length (at the beginning of the array) - inputArray = decryptOutputArray; - final int uncompressedLength = OptimizeUtilsByteArray.readInt(inputArray, true); - inputOffset = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-4 bytes for the decompressed size + final int uncompressedLength = OptimizeUtilsByteArray.readInt(temp, true); - - byte[] decompressOutputArray = this.decompressOutput; - if (uncompressedLength > decompressOutputLength) { - decompressOutputLength = uncompressedLength; - decompressOutputArray = new byte[uncompressedLength]; - this.decompressOutput = decompressOutputArray; - - decompressBuf = Unpooled.wrappedBuffer(decompressOutputArray); // so we can read via kryo - } - inputBuf = decompressBuf; + // where does our data start, AFTER the length field + int start = OptimizeUtilsByteArray.intLength(uncompressedLength, true); // because 1-4 bytes for the uncompressed size; // LZ4 decompress, requires the size of the ORIGINAL length (because we use the FAST decompressor - decompressor.decompress(inputArray, inputOffset, decompressOutputArray, 0, uncompressedLength); + reader.reset(); + decompressor.decompress(temp, start, reader.getBuffer(), 0, uncompressedLength); + reader.setLimit(uncompressedLength); - inputBuf.setIndex(0, uncompressedLength); + if (DEBUG) { + int endWithoutUncompressedLength = decryptedLength - start; + String compressed = ByteBufUtil.hexDump(temp, start, endWithoutUncompressedLength); + String orig = ByteBufUtil.hexDump(reader.getBuffer(), 0, uncompressedLength); + System.err.println("COMPRESSED: (" + endWithoutUncompressedLength + ")" + OS.LINE_SEPARATOR + compressed + + OS.LINE_SEPARATOR + + "ORIG: (" + uncompressedLength + ")" + OS.LINE_SEPARATOR + orig); + } // read the object from the buffer. - reader.setBuffer(inputBuf); - - return readClassAndObject(reader); // this properly sets the readerIndex, but only if it's the correct buffer + return read(connection, reader); } + @Override protected void finalize() throws Throwable { - if (decompressBuf != null) { - decompressBuf.release(); - } - - if (decryptBuf != null) { - decryptBuf.release(); - } + readerBuff.getByteBuf().release(); + writerBuff.getByteBuf().release(); super.finalize(); } @@ -594,5 +486,4 @@ class KryoExtra extends Kryo { NetworkSerializationManager getSerializationManager() { return serializationManager; } - } diff --git a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java b/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java index d3d5ee83..20462de9 100644 --- a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java +++ b/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java @@ -15,7 +15,7 @@ */ package dorkbox.network.connection.registration; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.SecretKey; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPoint; @@ -61,14 +61,14 @@ class ConnectionRegistrationImpl implements Connection_, ChannelHandler { @Override public - long getNextGcmSequence() { - return connection.getNextGcmSequence(); + long nextGcmSequence() { + return connection.nextGcmSequence(); } @Override public - ParametersWithIV getCryptoParameters() { - return connection.getCryptoParameters(); + SecretKey cryptoKey() { + return connection.cryptoKey(); } @Override diff --git a/src/dorkbox/network/connection/registration/MetaChannel.java b/src/dorkbox/network/connection/registration/MetaChannel.java index 9188188b..fb5b8b2a 100644 --- a/src/dorkbox/network/connection/registration/MetaChannel.java +++ b/src/dorkbox/network/connection/registration/MetaChannel.java @@ -17,6 +17,8 @@ package dorkbox.network.connection.registration; import java.util.concurrent.atomic.AtomicInteger; +import javax.crypto.SecretKey; + import org.bouncycastle.crypto.AsymmetricCipherKeyPair; import org.bouncycastle.crypto.params.ECPublicKeyParameters; @@ -44,9 +46,7 @@ class MetaChannel { public volatile ECPublicKeyParameters publicKey; // used for ECC crypto + handshake on NETWORK (remote) connections. This is the remote public key. public volatile AsymmetricCipherKeyPair ecdhKey; // used for ECC Diffie-Hellman-Merkle key exchanges: see http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange - // since we are using AES-GCM, the aesIV here **MUST** be exactly 12 bytes - public volatile byte[] aesKey; - public volatile byte[] aesIV; + public volatile SecretKey secretKey; // indicates if the remote ECC key has changed for an IP address. If the client detects this, it will not connect. // If the server detects this, it has the option for additional security (two-factor auth, perhaps?) diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index 01b1a62b..21b1fb92 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -200,25 +200,6 @@ class RegistrationRemoteHandler extends RegistrationHandler { protected abstract String getConnectionDirection(); - /** - * @return true if validation was successful - */ - final - boolean invalidAES(final MetaChannel metaChannel) { - if (metaChannel.aesKey.length != 32) { - logger.error("Fatal error trying to use AES key (wrong key length)."); - return true; - } - // IV length must == 12 because we are using GCM! - else if (metaChannel.aesIV.length != 12) { - logger.error("Fatal error trying to use AES IV (wrong IV length)."); - return true; - } - - return false; - } - - /** * upgrades a channel ONE channel at a time */ diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java index 6ed73870..c0263e27 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerClient.java @@ -21,6 +21,8 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import javax.crypto.spec.SecretKeySpec; + import org.bouncycastle.crypto.BasicAgreement; import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement; import org.bouncycastle.crypto.digests.SHA384Digest; @@ -99,7 +101,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { // IN: session ID + public key + ecc parameters (which are a nonce. the SERVER defines what these are) // OUT: remote ECDH shared payload - if (metaChannel.aesKey == null && registration.publicKey != null) { + if (metaChannel.secretKey == null && registration.publicKey != null) { // whoa! Didn't send valid public key info! if (invalidPublicKey(registration, type)) { shutdown(channel, registration.sessionID); @@ -133,7 +135,7 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { // IN: remote ECDH shared payload // OUT: hasMore=true if we have more registrations to do, false otherwise - if (metaChannel.aesKey == null) { + if (metaChannel.secretKey == null) { /* * Diffie-Hellman-Merkle key exchange for the AES key * http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange @@ -161,14 +163,8 @@ class RegistrationRemoteHandlerClient extends RegistrationRemoteHandler { sha384.update(keySeed, 0, keySeed.length); sha384.doFinal(digest, 0); - metaChannel.aesKey = Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes) - metaChannel.aesIV = Arrays.copyOfRange(digest, 32, 44); // 96bit blocksize (12 bytes) required by AES-GCM - - if (invalidAES(metaChannel)) { - // abort if something messed up! - shutdown(channel, registration.sessionID); - return; - } + byte[] key = org.bouncycastle.util.Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes) + metaChannel.secretKey = new SecretKeySpec(key, "AES"); Registration outboundRegister = new Registration(metaChannel.sessionId); diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java index 4fd8a2b3..e1eeb172 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandlerServer.java @@ -20,6 +20,8 @@ import java.net.InetSocketAddress; import java.security.SecureRandom; import java.util.concurrent.TimeUnit; +import javax.crypto.spec.SecretKeySpec; + import org.bouncycastle.crypto.AsymmetricCipherKeyPair; import org.bouncycastle.crypto.BasicAgreement; import org.bouncycastle.crypto.agreement.ECDHCBasicAgreement; @@ -123,7 +125,7 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { // IN: remote ECDH shared payload // OUT: server ECDH shared payload - if (metaChannel.aesKey == null) { + if (metaChannel.secretKey == null) { /* * Diffie-Hellman-Merkle key exchange for the AES key * http://en.wikipedia.org/wiki/Diffie%E2%80%93Hellman_key_exchange @@ -157,14 +159,8 @@ class RegistrationRemoteHandlerServer extends RegistrationRemoteHandler { sha384.update(keySeed, 0, keySeed.length); sha384.doFinal(digest, 0); - metaChannel.aesKey = Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes) - metaChannel.aesIV = Arrays.copyOfRange(digest, 32, 44); // 96bit blocksize (12 bytes) required by AES-GCM - - if (invalidAES(metaChannel)) { - // abort if something messed up! - shutdown(channel, registration.sessionID); - return; - } + byte[] key = Arrays.copyOfRange(digest, 0, 32); // 256bit keysize (32 bytes) + metaChannel.secretKey = new SecretKeySpec(key, "AES"); Registration outboundRegister = new Registration(metaChannel.sessionId); diff --git a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java index 06a09e28..6968320e 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelLocalWrapper.java @@ -17,7 +17,8 @@ package dorkbox.network.connection.wrapper; import java.util.concurrent.atomic.AtomicBoolean; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPoint; @@ -30,12 +31,14 @@ import io.netty.util.concurrent.Promise; public class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint { + private static final SecretKey dummyCryptoKey = new SecretKeySpec(new byte[32], "AES"); private final Channel channel; private final AtomicBoolean shouldFlush = new AtomicBoolean(false); private String remoteAddress; + public ChannelLocalWrapper(MetaChannel metaChannel) { this.channel = metaChannel.localChannel; @@ -95,8 +98,8 @@ class ChannelLocalWrapper implements ChannelWrapper, ConnectionPoint { @Override public - ParametersWithIV cryptoParameters() { - return null; + SecretKey cryptoKey() { + return dummyCryptoKey; } @Override diff --git a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java index 5eeca347..6dd2659a 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelNetworkWrapper.java @@ -17,15 +17,13 @@ package dorkbox.network.connection.wrapper; import java.net.InetSocketAddress; -import org.bouncycastle.crypto.params.KeyParameter; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.SecretKey; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.ISessionManager; import dorkbox.network.connection.registration.MetaChannel; -import dorkbox.util.FastThreadLocal; import io.netty.bootstrap.DatagramCloseMessage; import io.netty.util.NetUtil; @@ -43,11 +41,7 @@ class ChannelNetworkWrapper implements ChannelWrapper { private final String remoteAddress; private final boolean isLoopback; - // GCM IV. hacky way to prevent tons of GC and to not clobber the original parameters - private final byte[] aesKey; // AES-256 requires 32 bytes - private final byte[] aesIV; // AES-GCM requires 12 bytes - - private final FastThreadLocal cryptoParameters; + private final SecretKey secretKey; public ChannelNetworkWrapper(final MetaChannel metaChannel, final InetSocketAddress remoteAddress) { @@ -72,17 +66,8 @@ class ChannelNetworkWrapper implements ChannelWrapper { this.remoteAddress = remoteAddress.getAddress().getHostAddress(); this.remotePublicKeyChanged = metaChannel.changedRemoteKey; - // AES key & IV (only for networked connections) - aesKey = metaChannel.aesKey; - aesIV = metaChannel.aesIV; - - cryptoParameters = new FastThreadLocal() { - @Override - public - ParametersWithIV initialValue() { - return new ParametersWithIV(new KeyParameter(aesKey), aesIV); - } - }; + // AES key (only for networked connections) + secretKey = metaChannel.secretKey; } public final @@ -118,14 +103,12 @@ class ChannelNetworkWrapper implements ChannelWrapper { } /** - * @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal - * because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't - * clobber each other + * @return the AES key. */ @Override public - ParametersWithIV cryptoParameters() { - return this.cryptoParameters.get(); + SecretKey cryptoKey() { + return this.secretKey; } @Override diff --git a/src/dorkbox/network/connection/wrapper/ChannelWrapper.java b/src/dorkbox/network/connection/wrapper/ChannelWrapper.java index 22d4eb67..a1e5fdf0 100644 --- a/src/dorkbox/network/connection/wrapper/ChannelWrapper.java +++ b/src/dorkbox/network/connection/wrapper/ChannelWrapper.java @@ -15,7 +15,7 @@ */ package dorkbox.network.connection.wrapper; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.SecretKey; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPoint; @@ -33,11 +33,9 @@ interface ChannelWrapper { void flush(); /** - * @return a threadlocal AES key + IV. key=32 byte, iv=12 bytes (AES-GCM implementation). This is a threadlocal - * because multiple protocols can be performing crypto AT THE SAME TIME, and so we have to make sure that operations don't - * clobber each other + * @return the AES key. */ - ParametersWithIV cryptoParameters(); + SecretKey cryptoKey(); /** * @return true if this connection is connection on the loopback interface. This is specifically used to dynamically enable/disable diff --git a/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcpCompression.java similarity index 65% rename from src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java rename to src/dorkbox/network/pipeline/tcp/KryoDecoderTcpCompression.java index 582c2394..9e87b216 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java +++ b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcpCompression.java @@ -22,27 +22,20 @@ import io.netty.channel.ChannelHandlerContext; // on client this is MessageToMessage (because of the UdpDecoder in the pipeline!) + public -class KryoDecoderCrypto extends KryoDecoder { +class KryoDecoderTcpCompression extends KryoDecoderTcp { public - KryoDecoderCrypto(final NetworkSerializationManager serializationManager) { + KryoDecoderTcpCompression(final NetworkSerializationManager serializationManager) { super(serializationManager); } @Override protected Object readObject(final NetworkSerializationManager serializationManager, - final ChannelHandlerContext context, - final ByteBuf in, - final int length) throws Exception { - - try { - Connection_ connection = (Connection_) context.pipeline() - .last(); - return serializationManager.readWithCrypto(connection, in, length); - } catch (Exception e) { - throw e; - } + final ChannelHandlerContext context, final ByteBuf in, final int length) throws Exception { + Connection_ connection = (Connection_) context.pipeline().last(); + return serializationManager.readWithCompression(connection, in, length); } } diff --git a/src/dorkbox/network/pipeline/tcp/KryoDecoderTcpNone.java b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcpNone.java new file mode 100644 index 00000000..ca8c9812 --- /dev/null +++ b/src/dorkbox/network/pipeline/tcp/KryoDecoderTcpNone.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.pipeline.tcp; + +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +// on client this is MessageToMessage (because of the UdpDecoder in the pipeline!) + + +public +class KryoDecoderTcpNone extends KryoDecoderTcp { + + public + KryoDecoderTcpNone(final NetworkSerializationManager serializationManager) { + super(serializationManager); + } + + @Override + protected + Object readObject(final NetworkSerializationManager serializationManager, + final ChannelHandlerContext context, final ByteBuf in, final int length) throws Exception { + Connection_ connection = (Connection_) context.pipeline().last(); + return serializationManager.read(connection, in, length); + } +} diff --git a/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpCompression.java b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpCompression.java new file mode 100644 index 00000000..21a6da52 --- /dev/null +++ b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpCompression.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.pipeline.tcp; + +import java.io.IOException; + +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +@Sharable +public +class KryoEncoderTcpCompression extends KryoEncoderTcp { + + public + KryoEncoderTcpCompression(final NetworkSerializationManager serializationManager) { + super(serializationManager); + } + + @Override + protected + void writeObject(final NetworkSerializationManager serializationManager, + final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException { + Connection_ connection = (Connection_) context.pipeline().last(); + serializationManager.writeWithCompression(connection, buffer, msg); + } +} diff --git a/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpNone.java b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpNone.java new file mode 100644 index 00000000..aa51dc9b --- /dev/null +++ b/src/dorkbox/network/pipeline/tcp/KryoEncoderTcpNone.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 dorkbox, llc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.pipeline.tcp; + +import java.io.IOException; + +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +@Sharable +public +class KryoEncoderTcpNone extends KryoEncoderTcp { + + public + KryoEncoderTcpNone(final NetworkSerializationManager serializationManager) { + super(serializationManager); + } + + @Override + protected + void writeObject(final NetworkSerializationManager serializationManager, + final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException { + Connection_ connection = (Connection_) context.pipeline().last(); + serializationManager.write(connection, buffer, msg); + } +} diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCompression.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCompression.java new file mode 100644 index 00000000..d4f00142 --- /dev/null +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCompression.java @@ -0,0 +1,38 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.pipeline.udp; + +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +@Sharable +public +class KryoDecoderUdpCompression extends KryoDecoderUdp { + + public + KryoDecoderUdpCompression(NetworkSerializationManager serializationManager) { + super(serializationManager); + } + + protected + Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception { + Connection_ connection = (Connection_) context.pipeline().last(); + return serializationManager.readWithCompression(connection, in, length); + } +} diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdpNone.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpNone.java new file mode 100644 index 00000000..413b3c7c --- /dev/null +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpNone.java @@ -0,0 +1,38 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.pipeline.udp; + +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +@Sharable +public +class KryoDecoderUdpNone extends KryoDecoderUdp { + + public + KryoDecoderUdpNone(NetworkSerializationManager serializationManager) { + super(serializationManager); + } + + protected + Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception { + Connection_ connection = (Connection_) context.pipeline().last(); + return serializationManager.read(connection, in, length); + } +} diff --git a/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCompression.java b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCompression.java new file mode 100644 index 00000000..cdeb094a --- /dev/null +++ b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCompression.java @@ -0,0 +1,41 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.pipeline.udp; + +import java.io.IOException; + +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +@Sharable +public +class KryoEncoderUdpCompression extends KryoEncoderUdp { + + public + KryoEncoderUdpCompression(NetworkSerializationManager serializationManager) { + super(serializationManager); + } + + @Override + protected + void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) throws IOException { + Connection_ connection = (Connection_) ctx.pipeline().last(); + serializationManager.writeWithCompression(connection, buffer, msg); + } +} diff --git a/src/dorkbox/network/pipeline/udp/KryoEncoderUdpNone.java b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpNone.java new file mode 100644 index 00000000..13c49a9d --- /dev/null +++ b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpNone.java @@ -0,0 +1,41 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.pipeline.udp; + +import java.io.IOException; + +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +@Sharable +public +class KryoEncoderUdpNone extends KryoEncoderUdp { + + public + KryoEncoderUdpNone(NetworkSerializationManager serializationManager) { + super(serializationManager); + } + + @Override + protected + void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) throws IOException { + Connection_ connection = (Connection_) ctx.pipeline().last(); + serializationManager.write(connection, buffer, msg); + } +} diff --git a/src/dorkbox/network/rmi/RmiNopConnection.java b/src/dorkbox/network/rmi/RmiNopConnection.java index e7c01ad9..7845d29a 100644 --- a/src/dorkbox/network/rmi/RmiNopConnection.java +++ b/src/dorkbox/network/rmi/RmiNopConnection.java @@ -14,7 +14,7 @@ */ package dorkbox.network.rmi; -import org.bouncycastle.crypto.params.ParametersWithIV; +import javax.crypto.SecretKey; import dorkbox.network.connection.ConnectionPoint; import dorkbox.network.connection.Connection_; @@ -130,13 +130,13 @@ class RmiNopConnection implements Connection_ { @Override public - long getNextGcmSequence() { + long nextGcmSequence() { return 0; } @Override public - ParametersWithIV getCryptoParameters() { + SecretKey cryptoKey() { return null; } } diff --git a/src/dorkbox/network/serialization/NetworkSerializationManager.java b/src/dorkbox/network/serialization/NetworkSerializationManager.java index 56bbc3f4..cf7f18e0 100644 --- a/src/dorkbox/network/serialization/NetworkSerializationManager.java +++ b/src/dorkbox/network/serialization/NetworkSerializationManager.java @@ -30,6 +30,34 @@ import io.netty.buffer.ByteBuf; public interface NetworkSerializationManager extends SerializationManager { + /** + * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + *

+ * There is a small speed penalty if there were no kryo's available to use. + */ + void write(Connection_ connection, ByteBuf buffer, Object message) throws IOException; + + /** + * Reads an object from the buffer. + * + * @param length should ALWAYS be the length of the expected object! + */ + Object read(Connection_ connection, ByteBuf buffer, int length) throws IOException; + + /** + * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + *

+ * There is a small speed penalty if there were no kryo's available to use. + */ + void writeWithCompression(Connection_ connection, ByteBuf buffer, Object message) throws IOException; + + /** + * Reads an object from the buffer. + * + * @param length should ALWAYS be the length of the expected object! + */ + Object readWithCompression(Connection_ connection, ByteBuf buffer, int length) throws IOException; + /** * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. *

@@ -39,13 +67,8 @@ interface NetworkSerializationManager extends SerializationManager { /** * Reads an object from the buffer. - *

- * Crypto + sequence number * - * @param connection - * can be NULL - * @param length - * should ALWAYS be the length of the expected object! + * @param length should ALWAYS be the length of the expected object! */ Object readWithCrypto(Connection_ connection, ByteBuf buffer, int length) throws IOException; diff --git a/src/dorkbox/network/serialization/Serialization.java b/src/dorkbox/network/serialization/Serialization.java index 6681ecbc..d6f8123f 100644 --- a/src/dorkbox/network/serialization/Serialization.java +++ b/src/dorkbox/network/serialization/Serialization.java @@ -775,38 +775,23 @@ class Serialization implements NetworkSerializationManager { /** * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. - *

+ *

* There is a small speed penalty if there were no kryo's available to use. */ @Override public final - void writeWithCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { + void writeWithCompression(Connection_ connection, ByteBuf buffer, Object message) throws IOException { final KryoExtra kryo = kryoPool.take(); try { - // we only need to encrypt when NOT on loopback, since encrypting on loopback is a waste of CPU - if (connection.isLoopback()) { - if (wireWriteLogger.isTraceEnabled()) { - int start = buffer.writerIndex(); - kryo.writeCompressed(connection, buffer, message); - int end = buffer.writerIndex(); + if (wireWriteLogger.isTraceEnabled()) { + int start = buffer.writerIndex(); + kryo.writeCompressed(connection, buffer, message); + int end = buffer.writerIndex(); - wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); - } - else { - kryo.writeCompressed(connection, buffer, message); - } + wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); } else { - if (wireWriteLogger.isTraceEnabled()) { - int start = buffer.writerIndex(); - kryo.writeCrypto(connection, buffer, message); - int end = buffer.writerIndex(); - - wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); - } - else { - kryo.writeCrypto(connection, buffer, message); - } + kryo.writeCompressed(connection, buffer, message); } } finally { kryoPool.put(kryo); @@ -815,8 +800,111 @@ class Serialization implements NetworkSerializationManager { /** * Reads an object from the buffer. + * + * @param length should ALWAYS be the length of the expected object! + */ + @Override + public final + Object read(Connection_ connection, ByteBuf buffer, int length) throws IOException { + final KryoExtra kryo = kryoPool.take(); + try { + if (wireReadLogger.isTraceEnabled()) { + int start = buffer.readerIndex(); + Object object = kryo.read(connection, buffer); + int end = buffer.readerIndex(); + + wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); + + return object; + } + else { + return kryo.read(connection, buffer); + } + } finally { + kryoPool.put(kryo); + } + } + + /** + * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + *

+ * There is a small speed penalty if there were no kryo's available to use. + */ + @Override + public final + void write(Connection_ connection, ByteBuf buffer, Object message) throws IOException { + final KryoExtra kryo = kryoPool.take(); + try { + if (wireWriteLogger.isTraceEnabled()) { + int start = buffer.writerIndex(); + kryo.write(connection, buffer, message); + int end = buffer.writerIndex(); + + wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); + } + else { + kryo.write(connection, buffer, message); + } + } finally { + kryoPool.put(kryo); + } + } + + /** + * Reads an object from the buffer. + * + * @param connection can be NULL + * @param length should ALWAYS be the length of the expected object! + */ + @Override + public final + Object readWithCompression(Connection_ connection, ByteBuf buffer, int length) throws IOException { + final KryoExtra kryo = kryoPool.take(); + try { + if (wireReadLogger.isTraceEnabled()) { + int start = buffer.readerIndex(); + Object object = kryo.readCompressed(connection, buffer, length); + int end = buffer.readerIndex(); + + wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); + + return object; + } + else { + return kryo.readCompressed(connection, buffer, length); + } + } finally { + kryoPool.put(kryo); + } + } + + /** + * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. *

- * Crypto + sequence number + * There is a small speed penalty if there were no kryo's available to use. + */ + @Override + public final + void writeWithCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { + final KryoExtra kryo = kryoPool.take(); + try { + if (wireWriteLogger.isTraceEnabled()) { + int start = buffer.writerIndex(); + kryo.writeCrypto(connection, buffer, message); + int end = buffer.writerIndex(); + + wireWriteLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); + } + else { + kryo.writeCrypto(connection, buffer, message); + } + } finally { + kryoPool.put(kryo); + } + } + + /** + * Reads an object from the buffer. * * @param connection can be NULL * @param length should ALWAYS be the length of the expected object! @@ -827,34 +915,17 @@ class Serialization implements NetworkSerializationManager { Object readWithCrypto(final Connection_ connection, final ByteBuf buffer, final int length) throws IOException { final KryoExtra kryo = kryoPool.take(); try { - // we only need to encrypt when NOT on loopback, since encrypting on loopback is a waste of CPU - if (connection.isLoopback()) { - if (wireReadLogger.isTraceEnabled()) { - int start = buffer.readerIndex(); - Object object = kryo.readCompressed(connection, buffer, length); - int end = buffer.readerIndex(); + if (wireReadLogger.isTraceEnabled()) { + int start = buffer.readerIndex(); + Object object = kryo.readCrypto(connection, buffer, length); + int end = buffer.readerIndex(); - wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); + wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); - return object; - } - else { - return kryo.readCompressed(connection, buffer, length); - } + return object; } else { - if (wireReadLogger.isTraceEnabled()) { - int start = buffer.readerIndex(); - Object object = kryo.readCrypto(connection, buffer, length); - int end = buffer.readerIndex(); - - wireReadLogger.trace(ByteBufUtil.hexDump(buffer, start, end - start)); - - return object; - } - else { - return kryo.readCrypto(connection, buffer, length); - } + return kryo.readCrypto(connection, buffer, length); } } finally { kryoPool.put(kryo);