Storage now uses file.channel locks to prevent multiple jvm's from trashing the same file
This commit is contained in:
parent
32d7e2ec4a
commit
af057edd79
@ -6,6 +6,7 @@ import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.util.zip.Deflater;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
@ -112,17 +113,22 @@ public class Metadata {
|
||||
static Metadata readHeader(RandomAccessFile file, int position) throws IOException {
|
||||
byte[] buf = new byte[KEY_SIZE];
|
||||
long origHeaderKeyPointer = Metadata.getMetaDataPointer(position);
|
||||
|
||||
FileLock lock = file.getChannel().lock(origHeaderKeyPointer, INDEX_ENTRY_LENGTH, true);
|
||||
file.seek(origHeaderKeyPointer);
|
||||
file.readFully(buf);
|
||||
lock.release();
|
||||
|
||||
Metadata r = new Metadata(ByteArrayWrapper.wrap(buf));
|
||||
r.indexPosition = position;
|
||||
|
||||
long recordHeaderPointer = Metadata.getDataPointer(position);
|
||||
lock = file.getChannel().lock(origHeaderKeyPointer, KEY_SIZE, true);
|
||||
file.seek(recordHeaderPointer);
|
||||
r.dataPointer = file.readLong();
|
||||
r.dataCapacity = file.readInt();
|
||||
r.dataCount = file.readInt();
|
||||
lock.release();
|
||||
|
||||
if (r.dataPointer == 0L || r.dataCapacity == 0L || r.dataCount == 0L) {
|
||||
return null;
|
||||
@ -131,26 +137,24 @@ public class Metadata {
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
void write(RandomAccessFile file) throws IOException {
|
||||
writeMetaDataInfo(file);
|
||||
writeDataInfo(file);
|
||||
}
|
||||
|
||||
void writeMetaDataInfo(RandomAccessFile file) throws IOException {
|
||||
long recordKeyPointer = Metadata.getMetaDataPointer(this.indexPosition);
|
||||
|
||||
FileLock lock = file.getChannel().lock(recordKeyPointer, KEY_SIZE, false);
|
||||
file.seek(recordKeyPointer);
|
||||
file.write(this.key.getBytes());
|
||||
lock.release();
|
||||
}
|
||||
|
||||
void writeDataInfo(RandomAccessFile file) throws IOException {
|
||||
long recordHeaderPointer = getDataPointer(this.indexPosition);
|
||||
|
||||
FileLock lock = file.getChannel().lock(recordHeaderPointer, POINTER_INFO_SIZE, false);
|
||||
file.seek(recordHeaderPointer);
|
||||
file.writeLong(this.dataPointer);
|
||||
file.writeInt(this.dataCapacity);
|
||||
file.writeInt(this.dataCount);
|
||||
lock.release();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -160,12 +164,16 @@ public class Metadata {
|
||||
byte[] buf = new byte[KEY_SIZE];
|
||||
|
||||
long origHeaderKeyPointer = Metadata.getMetaDataPointer(this.indexPosition);
|
||||
FileLock lock = file.getChannel().lock(origHeaderKeyPointer, INDEX_ENTRY_LENGTH, true);
|
||||
file.seek(origHeaderKeyPointer);
|
||||
file.readFully(buf);
|
||||
lock.release();
|
||||
|
||||
long newHeaderKeyPointer = Metadata.getMetaDataPointer(newIndex);
|
||||
lock = file.getChannel().lock(newHeaderKeyPointer, INDEX_ENTRY_LENGTH, false);
|
||||
file.seek(newHeaderKeyPointer);
|
||||
file.write(buf);
|
||||
lock.release();
|
||||
|
||||
// System.err.println("updating ptr: " + this.indexPosition + " -> " + newIndex + " @ " + newHeaderKeyPointer + "-" + (newHeaderKeyPointer+INDEX_ENTRY_LENGTH));
|
||||
this.indexPosition = newIndex;
|
||||
@ -184,6 +192,8 @@ public class Metadata {
|
||||
this.dataPointer = position;
|
||||
this.dataCapacity = this.dataCount;
|
||||
|
||||
FileLock lock = file.getChannel().lock(position, this.dataCount, false);
|
||||
|
||||
// update the file size
|
||||
file.setLength(position + this.dataCount);
|
||||
|
||||
@ -192,6 +202,7 @@ public class Metadata {
|
||||
// save the data
|
||||
file.seek(position);
|
||||
file.write(data);
|
||||
lock.release();
|
||||
|
||||
// update header pointer info
|
||||
writeDataInfo(file);
|
||||
@ -206,10 +217,13 @@ public class Metadata {
|
||||
byte[] buf = new byte[this.dataCount];
|
||||
// System.err.print("Reading data: " + this.indexPosition + " @ " + this.dataPointer + "-" + (this.dataPointer+this.dataCount) + " -- ");
|
||||
|
||||
FileLock lock = file.getChannel().lock(this.dataPointer, this.dataCount, true);
|
||||
file.seek(this.dataPointer);
|
||||
file.readFully(buf);
|
||||
lock.release();
|
||||
// Sys.printArray(buf, buf.length, false, 0);
|
||||
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
@ -251,10 +265,14 @@ public class Metadata {
|
||||
|
||||
void writeData(ByteArrayOutputStream byteArrayOutputStream, RandomAccessFile file) throws IOException {
|
||||
this.dataCount = byteArrayOutputStream.size();
|
||||
|
||||
FileLock lock = file.getChannel().lock(this.dataPointer, this.dataCount, false);
|
||||
|
||||
FileOutputStream out = new FileOutputStream(file.getFD());
|
||||
file.seek(this.dataPointer);
|
||||
byteArrayOutputStream.writeTo(out);
|
||||
out.flush();
|
||||
lock.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,6 +7,7 @@ import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
@ -25,6 +26,10 @@ import com.esotericsoftware.kryo.Kryo;
|
||||
|
||||
import dorkbox.util.bytes.ByteArrayWrapper;
|
||||
|
||||
// a note on file locking between c and java
|
||||
// http://panks-dev.blogspot.de/2008/04/linux-file-locks-java-and-others.html
|
||||
// Also, file locks on linux are ADVISORY. if an app doesn't care about locks, then it can do stuff -- even if locked by another app
|
||||
|
||||
public class StorageBase {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@ -340,8 +345,10 @@ public class StorageBase {
|
||||
if (this.memoryIndex.size() == 1) {
|
||||
// if we are the ONLY one, then we can do things differently.
|
||||
// just dump the data agian to disk.
|
||||
FileLock lock = this.file.getChannel().lock(this.dataPosition, Long.MAX_VALUE, false); // don't know how big it is, so max value it
|
||||
this.file.seek(this.dataPosition); // this is the end of the file, we know this ahead-of-time
|
||||
metaData.writeDataFast(this.kryo, object, fileOutputStream);
|
||||
lock.release();
|
||||
} else {
|
||||
// this is comparatively slow, since we serialize it first to get the size, then we put it in the file.
|
||||
ByteArrayOutputStream dataStream = metaData.getDataStream(this.kryo, object, deflater);
|
||||
@ -428,7 +435,9 @@ public class StorageBase {
|
||||
private void deleteRecordData(Metadata deletedRecord) throws IOException {
|
||||
if (this.file.length() == deletedRecord.dataPointer + deletedRecord.dataCapacity) {
|
||||
// shrink file since this is the last record in the file
|
||||
FileLock lock = this.file.getChannel().lock(deletedRecord.dataPointer, Long.MAX_VALUE, false);
|
||||
this.file.setLength(deletedRecord.dataPointer);
|
||||
lock.release();
|
||||
} else {
|
||||
Metadata previous = index_getMetaDataFromData(deletedRecord.dataPointer - 1);
|
||||
if (previous != null) {
|
||||
@ -474,8 +483,10 @@ public class StorageBase {
|
||||
private final void setVersionNumber(RandomAccessFile file, int versionNumber) throws IOException {
|
||||
this.databaseVersion = versionNumber;
|
||||
|
||||
FileLock lock = this.file.getChannel().lock(VERSION_HEADER_LOCATION, 4, false);
|
||||
file.seek(VERSION_HEADER_LOCATION);
|
||||
file.writeInt(versionNumber);
|
||||
lock.release();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -484,8 +495,10 @@ public class StorageBase {
|
||||
private final void setRecordCount(RandomAccessFile file, int numberOfRecords) throws IOException {
|
||||
this.numberOfRecords = numberOfRecords;
|
||||
|
||||
FileLock lock = this.file.getChannel().lock(NUM_RECORDS_HEADER_LOCATION, 4, false);
|
||||
file.seek(NUM_RECORDS_HEADER_LOCATION);
|
||||
file.writeInt(numberOfRecords);
|
||||
lock.release();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -494,8 +507,10 @@ public class StorageBase {
|
||||
private final void setDataPosition(RandomAccessFile file, long dataPositionPointer) throws IOException {
|
||||
this.dataPosition = dataPositionPointer;
|
||||
|
||||
FileLock lock = this.file.getChannel().lock(DATA_START_HEADER_LOCATION, 8, false);
|
||||
file.seek(DATA_START_HEADER_LOCATION);
|
||||
file.writeLong(dataPositionPointer);
|
||||
lock.release();
|
||||
}
|
||||
|
||||
int getVersion() {
|
||||
@ -516,15 +531,19 @@ public class StorageBase {
|
||||
* of the record data of the RecordHeader which is returned. Returns null if the location is not part of a record.
|
||||
* (O(n) mem accesses)
|
||||
*/
|
||||
private final Metadata index_getMetaDataFromData(long targetFp) {
|
||||
private final Metadata index_getMetaDataFromData(long targetFp) throws IOException {
|
||||
Iterator<Metadata> iterator = this.memoryIndex.values().iterator();
|
||||
|
||||
FileLock lock = this.file.getChannel().lock(FILE_HEADERS_REGION_LENGTH, this.dataPosition, true);
|
||||
while (iterator.hasNext()) {
|
||||
Metadata next = iterator.next();
|
||||
if (targetFp >= next.dataPointer && targetFp < next.dataPointer + next.dataCapacity) {
|
||||
lock.release();
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
lock.release();
|
||||
return null;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user