Converted storage to use the 'single writer principle' for read speed

This commit is contained in:
nathan 2017-08-04 17:33:54 +02:00
parent 85a745dc3a
commit 99b95074ab
3 changed files with 245 additions and 178 deletions

View File

@ -17,11 +17,11 @@ package dorkbox.util.storage;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -38,14 +38,23 @@ import dorkbox.util.SerializationManager;
class DiskStorage implements Storage { class DiskStorage implements Storage {
// null if we are a read-only storage // null if we are a read-only storage
private final DelayTimer timer; private final DelayTimer timer;
// must be volatile
private volatile HashMap<StorageKey, Object> actionMap = new HashMap<StorageKey, Object>();
private final Object singleWriterLock = new Object[0];
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<DiskStorage, HashMap> actionMapREF =
AtomicReferenceFieldUpdater.newUpdater(DiskStorage.class, HashMap.class, "actionMap");
private final StorageBase storage; private final StorageBase storage;
private final AtomicInteger references = new AtomicInteger(1); private final AtomicInteger references = new AtomicInteger(1);
private final ReentrantLock actionLock = new ReentrantLock();
private final AtomicBoolean isOpen = new AtomicBoolean(false); private final AtomicBoolean isOpen = new AtomicBoolean(false);
private volatile long milliSeconds = 3000L; private volatile long milliSeconds = 3000L;
private volatile Map<StorageKey, Object> actionMap = new ConcurrentHashMap<StorageKey, Object>();
/** /**
* Creates or opens a new database file. * Creates or opens a new database file.
@ -61,16 +70,14 @@ class DiskStorage implements Storage {
@Override @Override
public public
void run() { void run() {
ReentrantLock actionLock = DiskStorage.this.actionLock;
Map<StorageKey, Object> actions; Map<StorageKey, Object> actions;
try {
actionLock.lock(); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
// do a fast swap on the actionMap. // do a fast swap on the actionMap.
actions = DiskStorage.this.actionMap; actions = DiskStorage.this.actionMap;
DiskStorage.this.actionMap = new ConcurrentHashMap<StorageKey, Object>(); DiskStorage.this.actionMap = new HashMap<StorageKey, Object>();
} finally {
actionLock.unlock();
} }
DiskStorage.this.storage.doActionThings(actions); DiskStorage.this.storage.doActionThings(actions);
@ -114,8 +121,11 @@ class DiskStorage implements Storage {
throw new RuntimeException("Unable to act on closed storage"); throw new RuntimeException("Unable to act on closed storage");
} }
// access a snapshot of the actionMap (single-writer-principle)
final HashMap actionMap = actionMapREF.get(this);
// check if our pending actions has it, or if our storage index has it // check if our pending actions has it, or if our storage index has it
return this.actionMap.containsKey(key) || this.storage.contains(key); return actionMap.containsKey(key) || this.storage.contains(key);
} }
/** /**
@ -175,24 +185,21 @@ class DiskStorage implements Storage {
throw new RuntimeException("Unable to act on closed storage"); throw new RuntimeException("Unable to act on closed storage");
} }
// if the object in is pending, we get it from there // access a snapshot of the actionMap (single-writer-principle)
try { final HashMap actionMap = actionMapREF.get(this);
this.actionLock.lock();
Object object = this.actionMap.get(key); // if the object in is pending, we get it from there
Object object = actionMap.get(key);
if (object != null) { if (object != null) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T returnObject = (T) object; T returnObject = (T) object;
return returnObject; return returnObject;
} }
} finally {
this.actionLock.unlock();
}
// not found, so we have to go find it on disk // not found, so we have to go find it on disk
return this.storage.get(key); return this.storage.get(key);
} }
/** /**
* Saves the given data to storage with the associated key. * Saves the given data to storage with the associated key.
@ -208,18 +215,11 @@ class DiskStorage implements Storage {
} }
if (timer != null) { if (timer != null) {
try { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
this.actionLock.lock(); // section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
if (object != null) {
// push action to map // push action to map
this.actionMap.put(key, object); actionMap.put(key, object);
}
else {
this.actionMap.remove(key);
}
} finally {
this.actionLock.unlock();
} }
// timer action runs on TIMER thread, not this thread // timer action runs on TIMER thread, not this thread
@ -243,6 +243,7 @@ class DiskStorage implements Storage {
// timer action runs on THIS thread, not timer thread // timer action runs on THIS thread, not timer thread
if (timer != null) { if (timer != null) {
// flush to storage, so we know if there were errors deleting from disk
this.timer.delay(0L); this.timer.delay(0L);
return this.storage.delete(key); return this.storage.delete(key);
} }

View File

@ -16,19 +16,29 @@
package dorkbox.util.storage; package dorkbox.util.storage;
import java.io.File; import java.io.File;
import java.util.concurrent.ConcurrentHashMap; import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/** /**
* Storage that is in memory only (and is not persisted to disk) * Storage that is in memory only (and is not persisted to disk)
*/ */
class MemoryStorage implements Storage { class MemoryStorage implements Storage {
private final ConcurrentHashMap<StorageKey, Object> storage;
// must be volatile
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private volatile HashMap<StorageKey, Object> storage = new HashMap<StorageKey, Object>();
private final Object singleWriterLock = new Object[0];
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<MemoryStorage, HashMap> storageREF =
AtomicReferenceFieldUpdater.newUpdater(MemoryStorage.class, HashMap.class, "storage");
private int version; private int version;
MemoryStorage() { MemoryStorage() {}
this.storage = new ConcurrentHashMap<StorageKey, Object>();
}
/** /**
@ -37,6 +47,8 @@ class MemoryStorage implements Storage {
@Override @Override
public public
int size() { int size() {
// access a snapshot of the storage (single-writer-principle)
HashMap storage = storageREF.get(this);
return storage.size(); return storage.size();
} }
@ -46,6 +58,8 @@ class MemoryStorage implements Storage {
@Override @Override
public public
boolean contains(final StorageKey key) { boolean contains(final StorageKey key) {
// access a snapshot of the storage (single-writer-principle)
HashMap storage = storageREF.get(this);
return storage.containsKey(key); return storage.containsKey(key);
} }
@ -56,6 +70,8 @@ class MemoryStorage implements Storage {
@Override @Override
public public
<T> T get(final StorageKey key) { <T> T get(final StorageKey key) {
// access a snapshot of the storage (single-writer-principle)
HashMap storage = storageREF.get(this);
return (T) storage.get(key); return (T) storage.get(key);
} }
@ -63,6 +79,9 @@ class MemoryStorage implements Storage {
@Override @Override
public public
<T> T get(final StorageKey key, final T data) { <T> T get(final StorageKey key, final T data) {
// access a snapshot of the storage (single-writer-principle)
HashMap storage = storageREF.get(this);
final Object o = storage.get(key); final Object o = storage.get(key);
if (o == null) { if (o == null) {
storage.put(key, data); storage.put(key, data);
@ -80,8 +99,12 @@ class MemoryStorage implements Storage {
@Override @Override
public public
void put(final StorageKey key, final Object object) { void put(final StorageKey key, final Object object) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
storage.put(key, object); storage.put(key, object);
} }
}
/** /**
* Deletes an object from storage. * Deletes an object from storage.
@ -89,9 +112,14 @@ class MemoryStorage implements Storage {
* @return true if the delete was successful. False if there were problems deleting the data. * @return true if the delete was successful. False if there were problems deleting the data.
*/ */
@Override @Override
public public synchronized
boolean delete(final StorageKey key) { boolean delete(final StorageKey key) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
storage.remove(key); storage.remove(key);
}
return true; return true;
} }
@ -173,6 +201,7 @@ class MemoryStorage implements Storage {
/** /**
* In-memory storage systems do not have a backing file, so there is nothing to close * In-memory storage systems do not have a backing file, so there is nothing to close
*/ */
@Override
public public
void close() { void close() {
StorageSystem.close(this); StorageSystem.close(this);

View File

@ -24,11 +24,12 @@ import java.io.RandomAccessFile;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -63,8 +64,17 @@ class StorageBase {
static final int FILE_HEADERS_REGION_LENGTH = 16; static final int FILE_HEADERS_REGION_LENGTH = 16;
// must be volatile
// The in-memory index (for efficiency, all of the record info is cached in memory). // The in-memory index (for efficiency, all of the record info is cached in memory).
private final Map<StorageKey, Metadata> memoryIndex; private volatile HashMap<StorageKey, Metadata> memoryIndex;
private final Object singleWriterLock = new Object[0];
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<StorageBase, HashMap> memoryREF =
AtomicReferenceFieldUpdater.newUpdater(StorageBase.class, HashMap.class, "memoryIndex");
// determines how much the index will grow by // determines how much the index will grow by
private final Float weight; private final Float weight;
@ -171,9 +181,12 @@ class StorageBase {
input = new Input(inputStream, BUFFER_SIZE); input = new Input(inputStream, BUFFER_SIZE);
//noinspection AutoBoxing
this.weight = 0.5F; this.weight = 0.5F;
this.memoryIndex = new ConcurrentHashMap<StorageKey, Metadata>(this.numberOfRecords);
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
this.memoryIndex = new HashMap<StorageKey, Metadata>(this.numberOfRecords);
if (!newStorage) { if (!newStorage) {
Metadata meta; Metadata meta;
@ -194,6 +207,7 @@ class StorageBase {
} }
} }
} }
}
/** /**
* Returns the current number of records in the database. * Returns the current number of records in the database.
@ -202,7 +216,10 @@ class StorageBase {
int size() { int size() {
// wrapper flushes first (protected by lock) // wrapper flushes first (protected by lock)
// not protected by lock // not protected by lock
return this.memoryIndex.size();
// access a snapshot of the memoryIndex (single-writer-principle)
HashMap memoryIndex = memoryREF.get(this);
return memoryIndex.size();
} }
/** /**
@ -212,8 +229,9 @@ class StorageBase {
boolean contains(StorageKey key) { boolean contains(StorageKey key) {
// protected by lock // protected by lock
// check to see if it's in the pending ops // access a snapshot of the memoryIndex (single-writer-principle)
return this.memoryIndex.containsKey(key); HashMap memoryIndex = memoryREF.get(this);
return memoryIndex.containsKey(key);
} }
/** /**
@ -223,7 +241,11 @@ class StorageBase {
<T> T getCached(StorageKey key) { <T> T getCached(StorageKey key) {
// protected by lock // protected by lock
Metadata meta = this.memoryIndex.get(key);
// access a snapshot of the memoryIndex (single-writer-principle)
HashMap memoryIndex = memoryREF.get(this);
Metadata meta = (Metadata) memoryIndex.get(key);
if (meta == null) { if (meta == null) {
return null; return null;
} }
@ -255,7 +277,9 @@ class StorageBase {
<T> T get(StorageKey key) { <T> T get(StorageKey key) {
// NOT protected by lock // NOT protected by lock
Metadata meta = this.memoryIndex.get(key); // access a snapshot of the memoryIndex (single-writer-principle)
HashMap memoryIndex = memoryREF.get(this);
Metadata meta = (Metadata) memoryIndex.get(key);
if (meta == null) { if (meta == null) {
return null; return null;
} }
@ -318,12 +342,28 @@ class StorageBase {
final final
boolean delete(StorageKey key) { boolean delete(StorageKey key) {
// pending ops flushed (protected by lock) // pending ops flushed (protected by lock)
// not protected by lock
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
Metadata delRec = this.memoryIndex.get(key); Metadata delRec = this.memoryIndex.get(key);
try { try {
deleteRecordData(delRec, delRec.dataCapacity); deleteRecordData(delRec, delRec.dataCapacity);
deleteRecordIndex(key, delRec);
// delete the record index
int currentNumRecords = this.memoryIndex.size();
if (delRec.indexPosition != currentNumRecords - 1) {
Metadata last = Metadata.readHeader(this.randomAccessFile, currentNumRecords - 1);
assert last != null;
last.moveRecord(this.randomAccessFile, delRec.indexPosition);
}
this.memoryIndex.remove(key);
setRecordCount(this.randomAccessFile, currentNumRecords - 1);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
if (this.logger != null) { if (this.logger != null) {
@ -334,6 +374,7 @@ class StorageBase {
return false; return false;
} }
} }
}
/** /**
* Closes the database and file. * Closes the database and file.
@ -352,7 +393,12 @@ class StorageBase {
.sync(); .sync();
this.input.close(); this.input.close();
this.randomAccessFile.close(); this.randomAccessFile.close();
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
this.memoryIndex.clear(); this.memoryIndex.clear();
}
} catch (IOException e) { } catch (IOException e) {
if (this.logger != null) { if (this.logger != null) {
@ -401,7 +447,12 @@ class StorageBase {
*/ */
private private
void save0(StorageKey key, Object object) { void save0(StorageKey key, Object object) {
Metadata metaData = this.memoryIndex.get(key); Metadata metaData;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention.
synchronized (singleWriterLock) {
metaData = this.memoryIndex.get(key);
int currentRecordCount = this.numberOfRecords; int currentRecordCount = this.numberOfRecords;
if (metaData != null) { if (metaData != null) {
@ -468,7 +519,7 @@ class StorageBase {
// append record to end of file // append record to end of file
long length = this.randomAccessFile.length(); long length = this.randomAccessFile.length();
// System.err.println("--Writing data to: " + length); // System.err.println("--Writing data to: " + length);
metaData = new Metadata(key, currentRecordCount, length); metaData = new Metadata(key, currentRecordCount, length);
metaData.writeMetaDataInfo(this.randomAccessFile); metaData.writeMetaDataInfo(this.randomAccessFile);
@ -501,6 +552,7 @@ class StorageBase {
return; return;
} }
} }
}
// put the object in the reference cache so we can read/get it later on // put the object in the reference cache so we can read/get it later on
metaData.objectReferenceCache = new WeakReference<Object>(object); metaData.objectReferenceCache = new WeakReference<Object>(object);
@ -528,8 +580,8 @@ class StorageBase {
// items to be "autosaved" are automatically injected into "actions". // items to be "autosaved" are automatically injected into "actions".
final Set<Entry<StorageKey, Object>> entries = actions.entrySet(); final Set<Entry<StorageKey, Object>> entries = actions.entrySet();
for (Entry<StorageKey, Object> entry : entries) { for (Entry<StorageKey, Object> entry : entries) {
Object object = entry.getValue();
StorageKey key = entry.getKey(); StorageKey key = entry.getKey();
Object object = entry.getValue();
// our action list is for explicitly saving objects (but not necessarily "registering" them to be auto-saved // our action list is for explicitly saving objects (but not necessarily "registering" them to be auto-saved
save0(key, object); save0(key, object);
@ -557,6 +609,7 @@ class StorageBase {
} }
// protected by singleWriterLock
private private
void deleteRecordData(Metadata deletedRecord, int sizeOfDataToAdd) throws IOException { void deleteRecordData(Metadata deletedRecord, int sizeOfDataToAdd) throws IOException {
if (this.randomAccessFile.length() == deletedRecord.dataPointer + deletedRecord.dataCapacity) { if (this.randomAccessFile.length() == deletedRecord.dataPointer + deletedRecord.dataCapacity) {
@ -613,23 +666,6 @@ class StorageBase {
} }
} }
private
void deleteRecordIndex(StorageKey key, Metadata deleteRecord) throws IOException {
int currentNumRecords = this.memoryIndex.size();
if (deleteRecord.indexPosition != currentNumRecords - 1) {
Metadata last = Metadata.readHeader(this.randomAccessFile, currentNumRecords - 1);
assert last != null;
last.moveRecord(this.randomAccessFile, deleteRecord.indexPosition);
}
this.memoryIndex.remove(key);
setRecordCount(this.randomAccessFile, currentNumRecords - 1);
}
/** /**
* Writes the number of records header to the file. * Writes the number of records header to the file.
*/ */
@ -705,12 +741,13 @@ class StorageBase {
*/ */
private private
Metadata index_getMetaDataFromData(long targetFp) { Metadata index_getMetaDataFromData(long targetFp) {
Iterator<Metadata> iterator = this.memoryIndex.values() // access a snapshot of the memoryIndex (single-writer-principle)
.iterator(); HashMap memoryIndex = memoryREF.get(this);
Iterator iterator = memoryIndex.values().iterator();
//noinspection WhileLoopReplaceableByForEach //noinspection WhileLoopReplaceableByForEach
while (iterator.hasNext()) { while (iterator.hasNext()) {
Metadata next = iterator.next(); Metadata next = (Metadata) iterator.next();
if (targetFp >= next.dataPointer && targetFp < next.dataPointer + next.dataCapacity) { if (targetFp >= next.dataPointer && targetFp < next.dataPointer + next.dataCapacity) {
return next; return next;
} }