Fixed missing file-region lock on writing data. Fixed bug with moving data position/pointers around

This commit is contained in:
nathan 2014-09-29 23:38:28 +02:00
parent c34b2ea222
commit 05b47a7f5f
2 changed files with 87 additions and 53 deletions

View File

@ -3,11 +3,9 @@ package dorkbox.util.storage;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
@ -237,30 +235,16 @@ public class Metadata {
return (T) readObject; return (T) readObject;
} }
ByteArrayOutputStream getDataStream(Kryo kryo, Object data, Deflater deflater) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
OutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, deflater);
Output output = new Output(outputStream, 1024); // write 1024 at a time
kryo.writeClassAndObject(output, data);
output.flush();
outputStream.flush();
outputStream.close();
return byteArrayOutputStream;
}
/** /**
* Writes data to the end of the file (which is where the datapointer is at) * Writes data to the end of the file (which is where the datapointer is at)
*/ */
void writeDataFast(Kryo kryo, Object data, DeflaterOutputStream outputStream) throws IOException { void writeDataFast(Kryo kryo, Object data, DeflaterOutputStream outputStream) throws IOException {
// HAVE TO LOCK BEFORE THIS IS CALLED! (AND FREE AFTERWARDS!)
Output output = new Output(outputStream, 1024); // write 1024 at a time Output output = new Output(outputStream, 1024); // write 1024 at a time
kryo.writeClassAndObject(output, data); kryo.writeClassAndObject(output, data);
output.flush(); output.flush();
outputStream.flush(); outputStream.flush(); // sync-flush is enabled, so the output stream will finish compressing data.
outputStream.finish(); // have to make sure that the outputstream finishes compressing data
} }
void writeData(ByteArrayOutputStream byteArrayOutputStream, RandomAccessFile file) throws IOException { void writeData(ByteArrayOutputStream byteArrayOutputStream, RandomAccessFile file) throws IOException {

View File

@ -5,6 +5,7 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
@ -23,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
import dorkbox.util.bytes.ByteArrayWrapper; import dorkbox.util.bytes.ByteArrayWrapper;
@ -68,7 +70,7 @@ public class StorageBase {
private int databaseVersion = 0; private int databaseVersion = 0;
/** /**
* Actual number of bytes of data held in this record (4 bytes). * Number of records (4 bytes).
*/ */
private int numberOfRecords; private int numberOfRecords;
@ -138,10 +140,10 @@ public class StorageBase {
this.kryo.setRegistrationRequired(false); this.kryo.setRegistrationRequired(false);
this.deflater = new Deflater(7, true); this.deflater = new Deflater(7, true);
this.outputStream = new DeflaterOutputStream(new FileOutputStream(this.file.getFD()), this.deflater); this.outputStream = new DeflaterOutputStream(new FileOutputStream(this.file.getFD()), this.deflater, 1024, true);
this.inflater = new Inflater(true); this.inflater = new Inflater(true);
this.inputStream = new InflaterInputStream(new FileInputStream(this.file.getFD()), this.inflater); this.inputStream = new InflaterInputStream(new FileInputStream(this.file.getFD()), this.inflater, 1024);
this.weight = .5F; this.weight = .5F;
this.memoryIndex = new ConcurrentHashMap<ByteArrayWrapper, Metadata>(this.numberOfRecords); this.memoryIndex = new ConcurrentHashMap<ByteArrayWrapper, Metadata>(this.numberOfRecords);
@ -155,6 +157,11 @@ public class StorageBase {
} }
this.memoryIndex.put(meta.key, meta); this.memoryIndex.put(meta.key, meta);
} }
if (this.memoryIndex.size() != this.numberOfRecords) {
setRecordCount(this.file, this.memoryIndex.size());
this.logger.warn("Mismatch record count in storage, auto-correcting size.");
}
} }
/** /**
@ -275,7 +282,7 @@ public class StorageBase {
Metadata delRec = this.memoryIndex.get(key); Metadata delRec = this.memoryIndex.get(key);
try { try {
deleteRecordData(delRec); deleteRecordData(delRec, delRec.dataCapacity);
deleteRecordIndex(key, delRec); deleteRecordIndex(key, delRec);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
@ -338,24 +345,30 @@ public class StorageBase {
deflater.reset(); deflater.reset();
Metadata metaData = this.memoryIndex.get(key); Metadata metaData = this.memoryIndex.get(key);
int currentRecordCount = this.numberOfRecords;
if (metaData != null) { if (metaData != null) {
// now we have to UPDATE instead of add! // now we have to UPDATE instead of add!
try { try {
if (this.memoryIndex.size() == 1) { if (currentRecordCount == 1) {
// if we are the ONLY one, then we can do things differently. // if we are the ONLY one, then we can do things differently.
// just dump the data agian to disk. // just dump the data agian to disk.
FileLock lock = this.file.getChannel().lock(this.dataPosition, Long.MAX_VALUE-this.dataPosition, false); // don't know how big it is, so max value it FileLock lock = this.file.getChannel().lock(this.dataPosition, Long.MAX_VALUE-this.dataPosition, false); // don't know how big it is, so max value it
this.file.seek(this.dataPosition); // this is the end of the file, we know this ahead-of-time this.file.seek(this.dataPosition); // this is the end of the file, we know this ahead-of-time
metaData.writeDataFast(this.kryo, object, fileOutputStream); metaData.writeDataFast(this.kryo, object, fileOutputStream);
// have to re-specify the capacity and size
int sizeOfWrittenData = (int) (this.file.length() - this.dataPosition);
metaData.dataCapacity = sizeOfWrittenData;
metaData.dataCount = sizeOfWrittenData;
lock.release(); lock.release();
} else { } else {
// this is comparatively slow, since we serialize it first to get the size, then we put it in the file. // this is comparatively slow, since we serialize it first to get the size, then we put it in the file.
ByteArrayOutputStream dataStream = metaData.getDataStream(this.kryo, object, deflater); ByteArrayOutputStream dataStream = getDataAsByteArray(this.kryo, object, deflater);
int size = dataStream.size(); int size = dataStream.size();
if (size > metaData.dataCapacity) { if (size > metaData.dataCapacity) {
deleteRecordData(metaData); deleteRecordData(metaData, size);
// stuff this record to the end of the file, since it won't fit in it's current location // stuff this record to the end of the file, since it won't fit in it's current location
metaData.dataPointer = this.file.length(); metaData.dataPointer = this.file.length();
// have to make sure that the CAPACITY of the new one is the SIZE of the new data! // have to make sure that the CAPACITY of the new one is the SIZE of the new data!
@ -371,8 +384,6 @@ public class StorageBase {
this.logger.error("Error while saving data to disk", e); this.logger.error("Error while saving data to disk", e);
} }
} else { } else {
int currentRecordCount = this.memoryIndex.size();
try { try {
// try to move the read head in order // try to move the read head in order
setRecordCount(this.file, currentRecordCount+1); setRecordCount(this.file, currentRecordCount+1);
@ -386,14 +397,17 @@ public class StorageBase {
metaData = new Metadata(key, currentRecordCount, length); metaData = new Metadata(key, currentRecordCount, length);
metaData.writeMetaDataInfo(this.file); metaData.writeMetaDataInfo(this.file);
// update index // add new entry to the index
this.memoryIndex.put(key, metaData); this.memoryIndex.put(key, metaData);
setRecordCount(this.file, currentRecordCount + 1);
// save out the data. Because we KNOW that we are writing this to the end of the file, // save out the data. Because we KNOW that we are writing this to the end of the file,
// there are some tricks we can use. // there are some tricks we can use.
this.file.seek(length); // this is the end of the file, we know this ahead-of-time
FileLock lock = this.file.getChannel().lock(length, Long.MAX_VALUE-length, false); // don't know how big it is, so max value it
this.file.seek(length); // this is the end of the file, we know this ahead-of-time
metaData.writeDataFast(this.kryo, object, fileOutputStream); metaData.writeDataFast(this.kryo, object, fileOutputStream);
lock.release();
metaData.dataCount = deflater.getTotalOut(); metaData.dataCount = deflater.getTotalOut();
metaData.dataCapacity = metaData.dataCount; metaData.dataCapacity = metaData.dataCount;
@ -409,6 +423,20 @@ public class StorageBase {
metaData.objectReferenceCache = new WeakReference<Object>(object); metaData.objectReferenceCache = new WeakReference<Object>(object);
} }
private ByteArrayOutputStream getDataAsByteArray(Kryo kryo, Object data, Deflater deflater) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
OutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, deflater);
Output output = new Output(outputStream, 1024); // write 1024 at a time
kryo.writeClassAndObject(output, data);
output.flush();
outputStream.flush();
outputStream.close();
return byteArrayOutputStream;
}
void doActionThings(Map<ByteArrayWrapper, Object> actions) { void doActionThings(Map<ByteArrayWrapper, Object> actions) {
DeflaterOutputStream outputStream2 = this.outputStream; DeflaterOutputStream outputStream2 = this.outputStream;
Deflater deflater2 = this.deflater; Deflater deflater2 = this.deflater;
@ -432,35 +460,63 @@ public class StorageBase {
///////////////////// /////////////////////
///////////////////// /////////////////////
private void deleteRecordData(Metadata deletedRecord) throws IOException {
/**
* "intelligent" move strategy.
*
* we should increase by some weight (ie: .5) would increase the number of allocated
* record headers by 50%, instead of just incrementing them by one at a time.
*/
private final int getWeightedNewRecordCount(int numberOfRecords) {
int newNumberOfRecords = numberOfRecords + 1 + (int) (numberOfRecords * this.weight); // int used for rounding
return newNumberOfRecords;
}
private void deleteRecordData(Metadata deletedRecord, int sizeOfDataToAdd) throws IOException {
if (this.file.length() == deletedRecord.dataPointer + deletedRecord.dataCapacity) { if (this.file.length() == deletedRecord.dataPointer + deletedRecord.dataCapacity) {
// shrink file since this is the last record in the file // shrink file since this is the last record in the file
FileLock lock = this.file.getChannel().lock(deletedRecord.dataPointer, Long.MAX_VALUE-deletedRecord.dataPointer, false); FileLock lock = this.file.getChannel().lock(deletedRecord.dataPointer, Long.MAX_VALUE-deletedRecord.dataPointer, false);
this.file.setLength(deletedRecord.dataPointer); this.file.setLength(deletedRecord.dataPointer);
lock.release(); lock.release();
} else { } else {
Metadata previous = index_getMetaDataFromData(deletedRecord.dataPointer - 1); // we MIGHT be the FIRST record
if (previous != null) { Metadata first = index_getMetaDataFromData(this.dataPosition);
// append space of deleted record onto previous record if (first == deletedRecord) {
previous.dataCapacity += deletedRecord.dataCapacity; // the record to delete is the FIRST (of many) in the file.
previous.writeDataInfo(this.file); // the FASTEST way to delete is to grow the number of allowed records!
} else { // Another option is to move the #2 data to the first data, but then there is the same gap after #2.
// because there is no "previous", that means we MIGHT be the FIRST record
Metadata first = index_getMetaDataFromData(this.dataPosition);
if (first == deletedRecord) { int numberOfRecords = this.numberOfRecords;
// the record to delete is the FIRST (of many) in the file.
// the FASTEST way to delete is to grow the number of allowed records! // "intelligent" move strategy.
// Another option is to move the #2 data to the first data, but then there is the same gap after #2. int newNumberOfRecords = getWeightedNewRecordCount(numberOfRecords);
setDataPosition(this.file, deletedRecord.dataPointer + deletedRecord.dataCapacity); long endIndexPointer = Metadata.getMetaDataPointer(newNumberOfRecords);
long endOfDataPointer = deletedRecord.dataPointer + deletedRecord.dataCapacity;
long newEndOfDataPointer = endOfDataPointer-sizeOfDataToAdd;
if (endIndexPointer < this.dataPosition && endIndexPointer <= newEndOfDataPointer) {
// one option is to shrink the RECORD section to fit the new data
setDataPosition(this.file, newEndOfDataPointer);
} else { } else {
// option two is to grow the RECORD section, and put the data at the end of the file
setDataPosition(this.file, endOfDataPointer);
}
} else {
Metadata previous = index_getMetaDataFromData(deletedRecord.dataPointer - 1);
if (previous != null) {
// append space of deleted record onto previous record
previous.dataCapacity += deletedRecord.dataCapacity;
previous.writeDataInfo(this.file);
} else {
// because there is no "previous", that means we MIGHT be the FIRST record
// well, we're not the first record. which one is RIGHT before us? // well, we're not the first record. which one is RIGHT before us?
// it should be "previous", so something fucked up // it should be "previous", so something fucked up
this.logger.error("Trying to delete an object, and it's in a weird state"); this.logger.error("Trying to delete an object, and it's in a weird state");
} }
} }
} }
} }
private void deleteRecordIndex(ByteArrayWrapper key, Metadata deleteRecord) throws IOException { private void deleteRecordIndex(ByteArrayWrapper key, Metadata deleteRecord) throws IOException {
@ -534,16 +590,13 @@ public class StorageBase {
private final Metadata index_getMetaDataFromData(long targetFp) throws IOException { private final Metadata index_getMetaDataFromData(long targetFp) throws IOException {
Iterator<Metadata> iterator = this.memoryIndex.values().iterator(); Iterator<Metadata> iterator = this.memoryIndex.values().iterator();
FileLock lock = this.file.getChannel().lock(FILE_HEADERS_REGION_LENGTH, this.dataPosition, true);
while (iterator.hasNext()) { while (iterator.hasNext()) {
Metadata next = iterator.next(); Metadata next = iterator.next();
if (targetFp >= next.dataPointer && targetFp < next.dataPointer + next.dataCapacity) { if (targetFp >= next.dataPointer && targetFp < next.dataPointer + next.dataCapacity) {
lock.release();
return next; return next;
} }
} }
lock.release();
return null; return null;
} }
@ -552,9 +605,8 @@ public class StorageBase {
* Ensure index capacity. This operation makes sure the INDEX REGION is large enough to accommodate additional entries. * Ensure index capacity. This operation makes sure the INDEX REGION is large enough to accommodate additional entries.
*/ */
private final void ensureIndexCapacity(RandomAccessFile file) throws IOException { private final void ensureIndexCapacity(RandomAccessFile file) throws IOException {
int numberOfRecords = this.memoryIndex.size(); // because we are zero indexed, this is ALSO the index where the record will START int numberOfRecords = this.numberOfRecords; // because we are zero indexed, this is ALSO the index where the record will START
int newNumberOfRecords = numberOfRecords + 1; // +1 because this is where that index will END (the start of the NEXT one) long endIndexPointer = Metadata.getMetaDataPointer(numberOfRecords + 1); // +1 because this is where that index will END (the start of the NEXT one)
long endIndexPointer = Metadata.getMetaDataPointer(newNumberOfRecords);
// just set the data position to the end of the file, since we don't have any data yet. // just set the data position to the end of the file, since we don't have any data yet.
if (endIndexPointer > file.length() && numberOfRecords == 0) { if (endIndexPointer > file.length() && numberOfRecords == 0) {
@ -574,9 +626,7 @@ public class StorageBase {
// otherwise, we have to grow our index. // otherwise, we have to grow our index.
Metadata first; Metadata first;
// "intelligent" move strategy. // "intelligent" move strategy.
// we should increase by some weight (ie: .5) would increase the number of allocated int newNumberOfRecords = getWeightedNewRecordCount(numberOfRecords);
// record headers by 50%, instead of just incrementing them by one at a time.
newNumberOfRecords = newNumberOfRecords + (int) (numberOfRecords * this.weight);
endIndexPointer = Metadata.getMetaDataPointer(newNumberOfRecords); endIndexPointer = Metadata.getMetaDataPointer(newNumberOfRecords);
// sometimes the endIndexPointer is in the middle of data, so we cannot move a record to where // sometimes the endIndexPointer is in the middle of data, so we cannot move a record to where