From 99b95074abf37d9297f895d8e43849871588544b Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 4 Aug 2017 17:33:54 +0200 Subject: [PATCH] Converted storage to use the 'single writer principle' for read speed --- src/dorkbox/util/storage/DiskStorage.java | 73 ++--- src/dorkbox/util/storage/MemoryStorage.java | 45 ++- src/dorkbox/util/storage/StorageBase.java | 305 +++++++++++--------- 3 files changed, 245 insertions(+), 178 deletions(-) diff --git a/src/dorkbox/util/storage/DiskStorage.java b/src/dorkbox/util/storage/DiskStorage.java index 19ad6f6..7a2071c 100644 --- a/src/dorkbox/util/storage/DiskStorage.java +++ b/src/dorkbox/util/storage/DiskStorage.java @@ -17,11 +17,11 @@ package dorkbox.util.storage; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.slf4j.Logger; @@ -38,14 +38,23 @@ import dorkbox.util.SerializationManager; class DiskStorage implements Storage { // null if we are a read-only storage private final DelayTimer timer; + + + // must be volatile + private volatile HashMap actionMap = new HashMap(); + + private final Object singleWriterLock = new Object[0]; + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater actionMapREF = + AtomicReferenceFieldUpdater.newUpdater(DiskStorage.class, HashMap.class, "actionMap"); + private final StorageBase storage; private final AtomicInteger references = new AtomicInteger(1); - private final ReentrantLock actionLock = new ReentrantLock(); private final AtomicBoolean isOpen = new AtomicBoolean(false); private volatile long milliSeconds = 3000L; - private volatile Map actionMap = new ConcurrentHashMap(); /** * Creates or opens a new database file. @@ -61,16 +70,14 @@ class DiskStorage implements Storage { @Override public void run() { - ReentrantLock actionLock = DiskStorage.this.actionLock; - Map actions; - try { - actionLock.lock(); + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { // do a fast swap on the actionMap. actions = DiskStorage.this.actionMap; - DiskStorage.this.actionMap = new ConcurrentHashMap(); - } finally { - actionLock.unlock(); + DiskStorage.this.actionMap = new HashMap(); } DiskStorage.this.storage.doActionThings(actions); @@ -114,8 +121,11 @@ class DiskStorage implements Storage { throw new RuntimeException("Unable to act on closed storage"); } + // access a snapshot of the actionMap (single-writer-principle) + final HashMap actionMap = actionMapREF.get(this); + // check if our pending actions has it, or if our storage index has it - return this.actionMap.containsKey(key) || this.storage.contains(key); + return actionMap.containsKey(key) || this.storage.contains(key); } /** @@ -175,24 +185,21 @@ class DiskStorage implements Storage { throw new RuntimeException("Unable to act on closed storage"); } + // access a snapshot of the actionMap (single-writer-principle) + final HashMap actionMap = actionMapREF.get(this); + // if the object in is pending, we get it from there - try { - this.actionLock.lock(); + Object object = actionMap.get(key); - Object object = this.actionMap.get(key); - - if (object != null) { - @SuppressWarnings("unchecked") - T returnObject = (T) object; - return returnObject; - } - } finally { - this.actionLock.unlock(); + if (object != null) { + @SuppressWarnings("unchecked") + T returnObject = (T) object; + return returnObject; } // not found, so we have to go find it on disk return this.storage.get(key); - } +} /** * Saves the given data to storage with the associated key. @@ -208,18 +215,11 @@ class DiskStorage implements Storage { } if (timer != null) { - try { - this.actionLock.lock(); - - if (object != null) { - // push action to map - this.actionMap.put(key, object); - } - else { - this.actionMap.remove(key); - } - } finally { - this.actionLock.unlock(); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { + // push action to map + actionMap.put(key, object); } // timer action runs on TIMER thread, not this thread @@ -243,6 +243,7 @@ class DiskStorage implements Storage { // timer action runs on THIS thread, not timer thread if (timer != null) { + // flush to storage, so we know if there were errors deleting from disk this.timer.delay(0L); return this.storage.delete(key); } diff --git a/src/dorkbox/util/storage/MemoryStorage.java b/src/dorkbox/util/storage/MemoryStorage.java index 3094f4b..a086be1 100644 --- a/src/dorkbox/util/storage/MemoryStorage.java +++ b/src/dorkbox/util/storage/MemoryStorage.java @@ -16,19 +16,29 @@ package dorkbox.util.storage; import java.io.File; -import java.util.concurrent.ConcurrentHashMap; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Storage that is in memory only (and is not persisted to disk) */ class MemoryStorage implements Storage { - private final ConcurrentHashMap storage; + + + // must be volatile + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + private volatile HashMap storage = new HashMap(); + + private final Object singleWriterLock = new Object[0]; + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater storageREF = + AtomicReferenceFieldUpdater.newUpdater(MemoryStorage.class, HashMap.class, "storage"); + private int version; - MemoryStorage() { - this.storage = new ConcurrentHashMap(); - } + MemoryStorage() {} /** @@ -37,6 +47,8 @@ class MemoryStorage implements Storage { @Override public int size() { + // access a snapshot of the storage (single-writer-principle) + HashMap storage = storageREF.get(this); return storage.size(); } @@ -46,6 +58,8 @@ class MemoryStorage implements Storage { @Override public boolean contains(final StorageKey key) { + // access a snapshot of the storage (single-writer-principle) + HashMap storage = storageREF.get(this); return storage.containsKey(key); } @@ -56,6 +70,8 @@ class MemoryStorage implements Storage { @Override public T get(final StorageKey key) { + // access a snapshot of the storage (single-writer-principle) + HashMap storage = storageREF.get(this); return (T) storage.get(key); } @@ -63,6 +79,9 @@ class MemoryStorage implements Storage { @Override public T get(final StorageKey key, final T data) { + // access a snapshot of the storage (single-writer-principle) + HashMap storage = storageREF.get(this); + final Object o = storage.get(key); if (o == null) { storage.put(key, data); @@ -80,7 +99,11 @@ class MemoryStorage implements Storage { @Override public void put(final StorageKey key, final Object object) { - storage.put(key, object); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { + storage.put(key, object); + } } /** @@ -89,9 +112,14 @@ class MemoryStorage implements Storage { * @return true if the delete was successful. False if there were problems deleting the data. */ @Override - public + public synchronized boolean delete(final StorageKey key) { - storage.remove(key); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { + storage.remove(key); + } + return true; } @@ -173,6 +201,7 @@ class MemoryStorage implements Storage { /** * In-memory storage systems do not have a backing file, so there is nothing to close */ + @Override public void close() { StorageSystem.close(this); diff --git a/src/dorkbox/util/storage/StorageBase.java b/src/dorkbox/util/storage/StorageBase.java index e45c041..4b0a62e 100644 --- a/src/dorkbox/util/storage/StorageBase.java +++ b/src/dorkbox/util/storage/StorageBase.java @@ -24,11 +24,12 @@ import java.io.RandomAccessFile; import java.lang.ref.WeakReference; import java.nio.channels.Channels; import java.nio.channels.FileLock; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; @@ -63,8 +64,17 @@ class StorageBase { static final int FILE_HEADERS_REGION_LENGTH = 16; + + // must be volatile // The in-memory index (for efficiency, all of the record info is cached in memory). - private final Map memoryIndex; + private volatile HashMap memoryIndex; + + private final Object singleWriterLock = new Object[0]; + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater memoryREF = + AtomicReferenceFieldUpdater.newUpdater(StorageBase.class, HashMap.class, "memoryIndex"); + // determines how much the index will grow by private final Float weight; @@ -171,25 +181,29 @@ class StorageBase { input = new Input(inputStream, BUFFER_SIZE); - //noinspection AutoBoxing this.weight = 0.5F; - this.memoryIndex = new ConcurrentHashMap(this.numberOfRecords); - if (!newStorage) { - Metadata meta; - for (int index = 0; index < this.numberOfRecords; index++) { - meta = Metadata.readHeader(this.randomAccessFile, index); - if (meta == null) { - // because we guarantee that empty metadata are ALWAYS at the end of the section, if we get a null one, break! - break; + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { + this.memoryIndex = new HashMap(this.numberOfRecords); + + if (!newStorage) { + Metadata meta; + for (int index = 0; index < this.numberOfRecords; index++) { + meta = Metadata.readHeader(this.randomAccessFile, index); + if (meta == null) { + // because we guarantee that empty metadata are ALWAYS at the end of the section, if we get a null one, break! + break; + } + this.memoryIndex.put(meta.key, meta); } - this.memoryIndex.put(meta.key, meta); - } - if (this.memoryIndex.size() != (this.numberOfRecords)) { - setRecordCount(this.randomAccessFile, this.memoryIndex.size()); - if (logger != null) { - logger.warn("Mismatch record count in storage, auto-correcting size."); + if (this.memoryIndex.size() != (this.numberOfRecords)) { + setRecordCount(this.randomAccessFile, this.memoryIndex.size()); + if (logger != null) { + logger.warn("Mismatch record count in storage, auto-correcting size."); + } } } } @@ -202,7 +216,10 @@ class StorageBase { int size() { // wrapper flushes first (protected by lock) // not protected by lock - return this.memoryIndex.size(); + + // access a snapshot of the memoryIndex (single-writer-principle) + HashMap memoryIndex = memoryREF.get(this); + return memoryIndex.size(); } /** @@ -212,8 +229,9 @@ class StorageBase { boolean contains(StorageKey key) { // protected by lock - // check to see if it's in the pending ops - return this.memoryIndex.containsKey(key); + // access a snapshot of the memoryIndex (single-writer-principle) + HashMap memoryIndex = memoryREF.get(this); + return memoryIndex.containsKey(key); } /** @@ -223,7 +241,11 @@ class StorageBase { T getCached(StorageKey key) { // protected by lock - Metadata meta = this.memoryIndex.get(key); + + // access a snapshot of the memoryIndex (single-writer-principle) + HashMap memoryIndex = memoryREF.get(this); + Metadata meta = (Metadata) memoryIndex.get(key); + if (meta == null) { return null; } @@ -255,7 +277,9 @@ class StorageBase { T get(StorageKey key) { // NOT protected by lock - Metadata meta = this.memoryIndex.get(key); + // access a snapshot of the memoryIndex (single-writer-principle) + HashMap memoryIndex = memoryREF.get(this); + Metadata meta = (Metadata) memoryIndex.get(key); if (meta == null) { return null; } @@ -318,20 +342,37 @@ class StorageBase { final boolean delete(StorageKey key) { // pending ops flushed (protected by lock) - // not protected by lock - Metadata delRec = this.memoryIndex.get(key); - try { - deleteRecordData(delRec, delRec.dataCapacity); - deleteRecordIndex(key, delRec); - return true; - } catch (IOException e) { - if (this.logger != null) { - this.logger.error("Error while deleting data from disk", e); - } else { - e.printStackTrace(); + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { + Metadata delRec = this.memoryIndex.get(key); + + try { + deleteRecordData(delRec, delRec.dataCapacity); + + // delete the record index + int currentNumRecords = this.memoryIndex.size(); + if (delRec.indexPosition != currentNumRecords - 1) { + Metadata last = Metadata.readHeader(this.randomAccessFile, currentNumRecords - 1); + assert last != null; + + last.moveRecord(this.randomAccessFile, delRec.indexPosition); + } + this.memoryIndex.remove(key); + + + setRecordCount(this.randomAccessFile, currentNumRecords - 1); + + return true; + } catch (IOException e) { + if (this.logger != null) { + this.logger.error("Error while deleting data from disk", e); + } else { + e.printStackTrace(); + } + return false; } - return false; } } @@ -352,7 +393,12 @@ class StorageBase { .sync(); this.input.close(); this.randomAccessFile.close(); - this.memoryIndex.clear(); + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { + this.memoryIndex.clear(); + } } catch (IOException e) { if (this.logger != null) { @@ -401,104 +447,110 @@ class StorageBase { */ private void save0(StorageKey key, Object object) { - Metadata metaData = this.memoryIndex.get(key); - int currentRecordCount = this.numberOfRecords; + Metadata metaData; - if (metaData != null) { - // now we have to UPDATE instead of add! - try { - if (currentRecordCount == 1) { - // if we are the ONLY one, then we can do things differently. - // just dump the data again to disk. - FileLock lock = this.randomAccessFile.getChannel() - .lock(this.dataPosition, - Long.MAX_VALUE - this.dataPosition, - false); // don't know how big it is, so max value it + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention. + synchronized (singleWriterLock) { + metaData = this.memoryIndex.get(key); + int currentRecordCount = this.numberOfRecords; - this.randomAccessFile.seek(this.dataPosition); // this is the end of the file, we know this ahead-of-time - Metadata.writeData(this.serializationManager, object, this.output); - // have to re-specify the capacity and size - //noinspection NumericCastThatLosesPrecision - int sizeOfWrittenData = (int) (this.randomAccessFile.length() - this.dataPosition); + if (metaData != null) { + // now we have to UPDATE instead of add! + try { + if (currentRecordCount == 1) { + // if we are the ONLY one, then we can do things differently. + // just dump the data again to disk. + FileLock lock = this.randomAccessFile.getChannel() + .lock(this.dataPosition, + Long.MAX_VALUE - this.dataPosition, + false); // don't know how big it is, so max value it - metaData.dataCapacity = sizeOfWrittenData; - metaData.dataCount = sizeOfWrittenData; + this.randomAccessFile.seek(this.dataPosition); // this is the end of the file, we know this ahead-of-time + Metadata.writeData(this.serializationManager, object, this.output); + // have to re-specify the capacity and size + //noinspection NumericCastThatLosesPrecision + int sizeOfWrittenData = (int) (this.randomAccessFile.length() - this.dataPosition); - lock.release(); - } - else { - // this is comparatively slow, since we serialize it first to get the size, then we put it in the file. - ByteArrayOutputStream dataStream = getDataAsByteArray(this.serializationManager, this.logger, object); + metaData.dataCapacity = sizeOfWrittenData; + metaData.dataCount = sizeOfWrittenData; - int size = dataStream.size(); - if (size > metaData.dataCapacity) { - deleteRecordData(metaData, size); - // stuff this record to the end of the file, since it won't fit in it's current location - metaData.dataPointer = this.randomAccessFile.length(); - // have to make sure that the CAPACITY of the new one is the SIZE of the new data! - // and since it is going to the END of the file, we do that. - metaData.dataCapacity = size; - metaData.dataCount = 0; + lock.release(); + } + else { + // this is comparatively slow, since we serialize it first to get the size, then we put it in the file. + ByteArrayOutputStream dataStream = getDataAsByteArray(this.serializationManager, this.logger, object); + + int size = dataStream.size(); + if (size > metaData.dataCapacity) { + deleteRecordData(metaData, size); + // stuff this record to the end of the file, since it won't fit in it's current location + metaData.dataPointer = this.randomAccessFile.length(); + // have to make sure that the CAPACITY of the new one is the SIZE of the new data! + // and since it is going to the END of the file, we do that. + metaData.dataCapacity = size; + metaData.dataCount = 0; + } + + // TODO: should check to see if the data is different. IF SO, then we write, otherwise nothing! + + metaData.writeDataRaw(dataStream, this.randomAccessFile); } - // TODO: should check to see if the data is different. IF SO, then we write, otherwise nothing! - - metaData.writeDataRaw(dataStream, this.randomAccessFile); - } - - metaData.writeDataInfo(this.randomAccessFile); - } catch (IOException e) { - if (this.logger != null) { - this.logger.error("Error while saving data to disk", e); - } else { - e.printStackTrace(); + metaData.writeDataInfo(this.randomAccessFile); + } catch (IOException e) { + if (this.logger != null) { + this.logger.error("Error while saving data to disk", e); + } else { + e.printStackTrace(); + } } } - } - else { - // metadata == null... - try { - // set the number of records that this storage has - setRecordCount(this.randomAccessFile, currentRecordCount + 1); + else { + // metadata == null... + try { + // set the number of records that this storage has + setRecordCount(this.randomAccessFile, currentRecordCount + 1); - // This will make sure that there is room to write a new record. This is zero indexed. - // this will skip around if moves occur - ensureIndexCapacity(this.randomAccessFile); + // This will make sure that there is room to write a new record. This is zero indexed. + // this will skip around if moves occur + ensureIndexCapacity(this.randomAccessFile); - // append record to end of file - long length = this.randomAccessFile.length(); + // append record to end of file + long length = this.randomAccessFile.length(); -// System.err.println("--Writing data to: " + length); + // System.err.println("--Writing data to: " + length); - metaData = new Metadata(key, currentRecordCount, length); - metaData.writeMetaDataInfo(this.randomAccessFile); + metaData = new Metadata(key, currentRecordCount, length); + metaData.writeMetaDataInfo(this.randomAccessFile); - // add new entry to the index - this.memoryIndex.put(key, metaData); + // add new entry to the index + this.memoryIndex.put(key, metaData); - // save out the data. Because we KNOW that we are writing this to the end of the file, - // there are some tricks we can use. + // save out the data. Because we KNOW that we are writing this to the end of the file, + // there are some tricks we can use. - // don't know how big it is, so max value it - FileLock lock = this.randomAccessFile.getChannel() - .lock(0, Long.MAX_VALUE, false); + // don't know how big it is, so max value it + FileLock lock = this.randomAccessFile.getChannel() + .lock(0, Long.MAX_VALUE, false); - // this is the end of the file, we know this ahead-of-time - this.randomAccessFile.seek(length); + // this is the end of the file, we know this ahead-of-time + this.randomAccessFile.seek(length); - int total = Metadata.writeData(this.serializationManager, object, this.output); - lock.release(); + int total = Metadata.writeData(this.serializationManager, object, this.output); + lock.release(); - metaData.dataCount = metaData.dataCapacity = total; - // have to save it. - metaData.writeDataInfo(this.randomAccessFile); - } catch (IOException e) { - if (this.logger != null) { - this.logger.error("Error while writing data to disk", e); - } else { - e.printStackTrace(); + metaData.dataCount = metaData.dataCapacity = total; + // have to save it. + metaData.writeDataInfo(this.randomAccessFile); + } catch (IOException e) { + if (this.logger != null) { + this.logger.error("Error while writing data to disk", e); + } else { + e.printStackTrace(); + } + return; } - return; } } @@ -528,8 +580,8 @@ class StorageBase { // items to be "autosaved" are automatically injected into "actions". final Set> entries = actions.entrySet(); for (Entry entry : entries) { - Object object = entry.getValue(); StorageKey key = entry.getKey(); + Object object = entry.getValue(); // our action list is for explicitly saving objects (but not necessarily "registering" them to be auto-saved save0(key, object); @@ -557,6 +609,7 @@ class StorageBase { } + // protected by singleWriterLock private void deleteRecordData(Metadata deletedRecord, int sizeOfDataToAdd) throws IOException { if (this.randomAccessFile.length() == deletedRecord.dataPointer + deletedRecord.dataCapacity) { @@ -613,23 +666,6 @@ class StorageBase { } } - private - void deleteRecordIndex(StorageKey key, Metadata deleteRecord) throws IOException { - int currentNumRecords = this.memoryIndex.size(); - - if (deleteRecord.indexPosition != currentNumRecords - 1) { - Metadata last = Metadata.readHeader(this.randomAccessFile, currentNumRecords - 1); - assert last != null; - - last.moveRecord(this.randomAccessFile, deleteRecord.indexPosition); - } - - this.memoryIndex.remove(key); - - setRecordCount(this.randomAccessFile, currentNumRecords - 1); - } - - /** * Writes the number of records header to the file. */ @@ -705,12 +741,13 @@ class StorageBase { */ private Metadata index_getMetaDataFromData(long targetFp) { - Iterator iterator = this.memoryIndex.values() - .iterator(); + // access a snapshot of the memoryIndex (single-writer-principle) + HashMap memoryIndex = memoryREF.get(this); + Iterator iterator = memoryIndex.values().iterator(); //noinspection WhileLoopReplaceableByForEach while (iterator.hasNext()) { - Metadata next = iterator.next(); + Metadata next = (Metadata) iterator.next(); if (targetFp >= next.dataPointer && targetFp < next.dataPointer + next.dataCapacity) { return next; }