jens@0: /* jens@0: * VersionDictionary.cpp jens@0: * Ottoman jens@0: * jens@0: * Created by Jens Alfke on 8/21/09. jens@0: * Copyright 2009 Jens Alfke. All rights reserved. jens@0: * BSD-Licensed: See the file "LICENSE.txt" for details. jens@0: */ jens@0: jens@0: #include "VersionDictionary.h" jens@0: #include "Chunk.h" jens@0: #include "Index.h" jens@0: #include "File.h" jens@0: #include "Chunk.h" jens@0: #include jens@0: #include jens@0: #include jbm@8: #include jbm@8: jbm@8: # define UINT32_MAX (4294967295U) jbm@8: jens@0: jens@0: namespace Mooseyard { jens@0: jens@0: // Trailer is stored in file; all numbers are little-endian. jens@0: class VersionDictionary::Trailer :public Chunk { jens@0: public: jens@0: LittleEndian magicNumber1; jens@0: LittleEndian count; jens@0: VersionDictionary::IndexPositions indexPositions; jens@0: LittleEndian previousTrailerPosition; jens@0: LittleEndian generation; jens@0: LittleEndianDouble timestamp; jens@0: LittleEndian magicNumber2; jens@0: jens@0: Trailer () jens@0: :Chunk(sizeof(Trailer), kChunkType) jens@0: { } jens@0: jens@0: Trailer (int c, jens@0: VersionDictionary::IndexPositions indexPos, jens@0: FilePosition previousTrailerPos, jens@0: uint32_t gen) jens@0: :Chunk(sizeof(Trailer), kChunkType), jens@0: magicNumber1(kMagicNumber1), jens@0: count(c), jens@0: indexPositions(indexPos), jens@0: previousTrailerPosition(previousTrailerPos), jens@0: generation(gen), jens@0: timestamp(::time(NULL)), jens@0: magicNumber2(kMagicNumber2) jens@0: { } jens@0: jens@0: static const uint32_t kMagicNumber1 = 0xfe83b1cd; jens@0: static const uint32_t kMagicNumber2 = 0xff84b2c1; jens@0: }; jens@0: jens@0: jens@0: jens@0: VersionDictionary::VersionDictionary (File *file) jens@0: :_file(file), jens@0: _trailerPosition(0), jens@0: _previousTrailerPosition(0), jens@0: _indexPositions(), jens@0: _count(0), jens@0: _previousVersion() jens@0: { jens@0: if (file->length() > sizeof(Trailer)) jens@0: _read(); jens@0: } jens@0: jens@0: VersionDictionary::VersionDictionary (File *file, FilePosition trailerPosition) jens@0: :_file(file), jens@0: _trailerPosition(trailerPosition), jens@0: _previousTrailerPosition(0), jens@0: _indexPositions(), jens@0: _count(0), jens@0: _previousVersion() jens@0: { jens@0: _read(trailerPosition); jens@0: } jens@0: jens@0: int VersionDictionary::count() const { jens@0: return _count; jens@0: } jens@0: jens@0: const VersionDictionary::Trailer* VersionDictionary::_trailer() const { jens@0: if (_trailerPosition > 0) jens@0: return (const VersionDictionary::Trailer*) _file->mappedPosition(_trailerPosition); jens@0: else jens@0: return NULL; // Only happens in the empty-file case jens@0: } jens@0: jens@0: const Index* VersionDictionary::_index (int i) const { jens@0: if (_indexPositions[i] > 0) jens@0: return (const Index*) _file->mappedPosition(_indexPositions[i]); jens@0: else jens@0: return NULL; jens@0: } jens@0: jens@0: int VersionDictionary::generation() const { jens@0: const VersionDictionary::Trailer *trailer = _trailer(); jens@0: return trailer ? (int)trailer->generation : -1; jens@0: } jens@0: jens@0: time_t VersionDictionary::timestamp() const { jens@0: const VersionDictionary::Trailer *trailer = _trailer(); jens@0: return trailer ? (time_t)trailer->timestamp : 0; jens@0: } jens@0: jens@0: const VersionDictionary* VersionDictionary::previousVersion() const { jens@0: if (!_previousVersion) jens@0: if (_previousTrailerPosition > 0) jens@0: _previousVersion = new VersionDictionary(_file, _previousTrailerPosition); jens@0: return _previousVersion; jens@0: } jens@0: jens@0: jens@0: Blob VersionDictionary::get (Key key) const { jens@0: const Index *index = _index(key.hash >> 24); jens@0: return index ?index->get(_file, key) :Blob(); jens@0: } jens@0: jens@0: void VersionDictionary::_read (FilePosition trailerPos) { jens@0: assert(_file); jens@0: jens@0: if (trailerPos > 0) { jens@0: _file->setPosition(trailerPos); jens@0: } else { jens@0: // Determine position of trailer, at EOF: jens@0: off_t pos = _file->setPositionToEnd(sizeof(VersionDictionary::Trailer)); jens@0: if (pos < 0 || (pos & 0x03) || pos > UINT32_MAX) jens@0: throw File::Error(ERANGE, "No trailer found in file (wrong EOF)"); jens@2: trailerPos = (FilePosition)pos; jens@0: } jens@0: jens@0: // Read & verify trailer: jens@0: VersionDictionary::Trailer trailer; jens@0: _file->read(trailer); jens@0: _trailerPosition = trailerPos; jens@5: _previousTrailerPosition = trailer.previousTrailerPosition; jens@0: _count = trailer.count; jens@0: _indexPositions = trailer.indexPositions; jens@0: jens@0: if (trailer.magicNumber1 != VersionDictionary::Trailer::kMagicNumber1 jens@0: || trailer.magicNumber2 != VersionDictionary::Trailer::kMagicNumber2) jens@5: throw File::Error("No trailer found in file (invalid magic numbers)"); jens@5: if (_previousTrailerPosition >= _trailerPosition) jens@5: throw File::Error("Bad VersionDictionary trailer (illegal previousTrailerPosition)"); jens@0: jens@0: // Map in the file: jens@0: _file->mapRegion(0, _trailerPosition+sizeof(trailer)); jens@0: jens@0: // Verify Indexes: jens@0: for (int i=0; i<256; i++) { jens@5: if (_indexPositions[i] > 0) jens@5: if (_indexPositions[i] < _previousTrailerPosition || _indexPositions[i] >= _trailerPosition) jens@5: throw File::Error("Bad VersionDictionary trailer (illegal index position)"); jens@0: const Index *index = _index(i); jens@0: if (index) jens@0: index->validate(); jens@0: } jens@0: } jens@0: jens@0: jens@0: bool VersionDictionary::isLatestVersion() const { jens@0: return _file->length() <= _trailerPosition+sizeof(VersionDictionary::Trailer); jens@0: } jens@0: jens@0: jens@0: /* Append addDict to baseDict, writing the results into dstFile (which is usually the same jens@0: as baseDict->file().) If 'replace' is true, ignore the old contents of baseDict. jens@0: Returns the position of the new trailer. */ jens@0: FilePosition VersionDictionary::_append (const VersionDictionary *baseDict, jens@0: const Dictionary *addDict, jens@0: File *dstFile, jens@0: bool replace) jens@0: { jens@0: File *srcFile = baseDict->_file; jens@0: bool incremental = !replace && dstFile==srcFile; jens@0: Index* newIndex[256]; jens@0: jens@0: { jens@0: // Work out the needed capacity for each Index bucket: jens@0: int newCounts[256] = {0}; jens@0: if (!replace) { jens@0: for (int i=0; i<256; i++) { jens@0: const Index *oldIndex = baseDict->_index(i); jens@0: if (oldIndex) jens@0: newCounts[i] = oldIndex->count(); jens@0: } jens@0: } jens@0: Dictionary::Iterator *it = addDict->iterate(); jens@0: for (; *it; ++*it) jens@0: newCounts[it->key().hash >> 24]++; jens@0: delete it; jens@0: jens@0: // Allocate new Indexes, of sufficient capacity, for each growing bucket: jens@0: for (int i=0; i<256; i++) { jens@0: const Index *oldIndex = baseDict->_index(i); jens@0: if (newCounts[i] && (!incremental || !oldIndex || newCounts[i] > oldIndex->count())) { jens@0: newIndex[i] = Index::create(newCounts[i]); jens@0: if (incremental && oldIndex) jens@0: newIndex[i]->copyFrom(srcFile, oldIndex); jens@0: } else jens@0: newIndex[i] = NULL; jens@0: } jens@0: } jens@0: jens@0: // Lock the file now, seek to the end, and make sure it's been prepared with a header, jens@0: // since FilePositions of 0 and 1 are reserved. jens@0: File::Lock lock(dstFile); jens@2: const FilePosition startPos = (FilePosition) dstFile->setPositionToEnd(); jens@0: if (startPos <= 1) jens@0: throw File::Error(ERANGE, "Cannot write VersionDictionary to empty file"); jens@0: jens@0: // For safety's sake, make sure the old file hasn't been destroyed: jens@0: FilePosition oldEOF = baseDict->_trailerPosition; jens@0: if (oldEOF > 0) jens@0: oldEOF += sizeof(VersionDictionary::Trailer); jens@0: if (srcFile->length() < oldEOF) jens@0: throw File::Error(ERANGE, "File has been truncated since being read"); jens@0: jens@0: try{ jens@0: FilePosition pos = startPos; jens@0: jens@0: int newCount; jens@0: if (replace) jens@0: newCount = 0; jens@0: else if (dstFile == srcFile) jens@0: newCount = baseDict->_count; jens@0: else { jens@0: // Write out the surviving pre-existing entries from the old file: jens@0: newCount = 0; jens@0: for (VersionDictionary::Iterator it(baseDict); it; ++it) { jens@0: Key key = it.key(); jens@0: if (!addDict->contains(key)) { jens@0: int bucket = key.hash >> 24; jens@0: int size = KeyValueChunk::write(dstFile,pos, key, it.value()); jens@0: newIndex[bucket]->put(srcFile, key, pos, startPos); jens@0: pos += size; jens@0: newCount++; jens@0: } jens@0: } jens@0: } jens@0: jens@0: // Now write the items from the new dict: jens@0: Dictionary::Iterator *it = addDict->iterate(); jens@0: for (; *it; ++*it) { jens@0: Key key=it->key(); jens@0: Blob value=it->value(); jens@0: int bucket = key.hash >> 24; jens@0: Index *index = newIndex[bucket]; jens@0: assert(index); jens@0: jens@0: if (value.bytes) { jens@0: int size = KeyValueChunk::write(dstFile,pos, key, value); jens@0: if (index->put(srcFile, key, pos, startPos)) jens@0: newCount++; jens@0: pos += size; jens@0: } else if (incremental) { jens@0: // NULL value is a deleted-entry marker used by OverlayDictionary jens@0: if (index->remove(srcFile, key, startPos)) jens@0: newCount--; jens@0: } jens@0: } jens@0: delete it; jens@0: jens@0: // Write out the new indexes: jens@0: IndexPositions newIndexPositions; jens@0: if (incremental) jens@0: newIndexPositions = baseDict->_indexPositions; jens@0: else jens@0: memset(&newIndexPositions, 0, sizeof(newIndexPositions)); jens@0: jens@0: pos += Chunk::writePadding(dstFile); jens@0: for (int i=0; i<256; i++) { jens@0: if (newIndex[i]) { jens@0: Index *index = newIndex[i]; jens@0: newIndexPositions[i] = pos; jens@0: pos += dstFile->write(index, index->size()); jens@0: delete index; jens@0: } jens@0: } jens@0: jens@0: // Flush everything out to disk, with maximum paranoia, before writing the trailer. jens@0: // Since scavenging corrupt files looks for trailers, we don't want to append a jens@0: // trailer until we're certain that all of the real data is safely on-disk. jens@0: dstFile->flushDisk(); jens@0: jens@0: // Write the trailer: jens@0: FilePosition newTrailerPosition = pos; jens@0: VersionDictionary::Trailer trailer(newCount, jens@0: newIndexPositions, jens@0: baseDict->_trailerPosition, jens@0: baseDict->generation() + 1); jens@0: pos += dstFile->write(trailer); jens@0: jens@0: // Just a mild flush here; flushDisk() is very expensive and can be disruptive to jens@0: // real-time I/O in other apps, so it's bad to call it too often. jens@0: dstFile->flush(); jens@0: assert(pos==dstFile->position()); jens@0: jens@0: return newTrailerPosition; jens@0: jens@0: } catch (...) { jens@0: // If something goes wrong, try to back out everything we wrote: jens@0: try{ jens@0: dstFile->setLength(startPos); jens@0: }catch(...) { } jens@0: throw; jens@0: } jens@0: } jens@0: jens@0: jens@0: #pragma mark - jens@0: #pragma mark TESTING-ONLY: jens@0: jens@0: VersionDictionary* VersionDictionary::_appendAndOpen (const Dictionary *addDict, jens@0: File *dstFile, jens@0: bool replace) const jens@0: { jens@0: FilePosition nextVersionPos = _append(this, addDict, dstFile, replace); jens@0: VersionDictionary *nextVersion = new VersionDictionary(dstFile, nextVersionPos); jens@0: nextVersion->_previousVersion = this; jens@0: return nextVersion; jens@0: } jens@0: jens@0: VersionDictionary* VersionDictionary::create (File *file, const Dictionary *srcDict) { jens@0: return VersionDictionary(file)._appendAndOpen(srcDict, file, true); jens@0: } jens@0: jens@0: jens@0: #pragma mark - jens@0: #pragma mark ITERATOR: jens@0: jens@0: Dictionary::Iterator* VersionDictionary::iterate() const { jens@0: return new VersionDictionary::Iterator(this); jens@0: } jens@0: jens@0: VersionDictionary::Iterator::Iterator (const VersionDictionary *file) jens@0: :_file(file), jens@0: _bucket(-1), jens@0: _iter(NULL) jens@0: { jens@0: nextIndex(); jens@0: } jens@0: jens@0: VersionDictionary::Iterator::~Iterator() { jens@0: delete _iter; jens@0: } jens@0: jens@0: bool VersionDictionary::Iterator::hasValue() const {return _iter && _iter->hasValue();} jens@0: Key VersionDictionary::Iterator::key() const {return _iter->key();} jens@0: Blob VersionDictionary::Iterator::value() const {return _iter->value();} jens@0: jens@0: bool VersionDictionary::Iterator::next() { jens@0: if (_iter->next()) jens@0: return true; jens@0: else { jens@0: delete _iter; jens@0: return nextIndex(); jens@0: } jens@0: } jens@0: jens@0: bool VersionDictionary::Iterator::nextIndex() { jens@0: while (++_bucket < 256) { jens@0: const Index *index = _file->_index(_bucket); jens@0: if (index) { jens@0: _iter = new Index::Iterator(_file->_file, index); jens@0: if (*_iter) jens@0: return true; jens@0: else jens@0: delete _iter; jens@0: } jens@0: } jens@0: _iter = NULL; jens@0: return false; jens@0: } jens@0: jens@0: jens@0: #pragma mark - jens@0: #pragma mark CHANGE ITERATOR: jens@0: jens@0: VersionDictionary::ChangeIterator* VersionDictionary::iterateChanges() const { jens@0: return new VersionDictionary::ChangeIterator(this); jens@0: } jens@0: jens@0: VersionDictionary::ChangeIterator::ChangeIterator (const VersionDictionary *file) jens@0: :_file(file), jens@0: _bucket(-1), jens@0: _iter(NULL) jens@0: { jens@0: next(); jens@0: } jens@0: jens@0: VersionDictionary::ChangeIterator::~ChangeIterator() { jens@0: delete _iter; jens@0: } jens@0: jens@0: bool VersionDictionary::ChangeIterator::hasValue() const {return _iter && _iter->hasValue();} jens@0: Key VersionDictionary::ChangeIterator::key() const {return _iter->key();} jens@0: Blob VersionDictionary::ChangeIterator::value() const {return _iter->value();} jens@0: jens@0: bool VersionDictionary::ChangeIterator::next() { jens@0: const VersionDictionary::Trailer *trailer = _file->_trailer(); jens@0: for (;;) { jens@0: // Check if current iterator has a value that's from this version: jens@0: if (_iter && _iter->hasValue()) { jens@0: if (((Index::Iterator*)_iter)->valuePosition() > trailer->previousTrailerPosition) jens@0: return true; jens@0: else if (_iter->next()) jens@0: continue; jens@0: } jens@0: // If not, go to next Index: jens@0: if (!nextIndex()) jens@0: return false; jens@0: } jens@0: } jens@0: jens@0: bool VersionDictionary::ChangeIterator::nextIndex() { jens@0: delete _iter; jens@0: const VersionDictionary::Trailer *trailer = _file->_trailer(); jens@0: while (++_bucket < 256) { jens@0: const Index *index = _file->_index(_bucket); jens@0: if (index) { jens@0: // Skip indexes that weren't updated in this version: jens@0: if (trailer->indexPositions[_bucket] > trailer->previousTrailerPosition) { jens@0: _iter = new Index::Iterator(_file->_file, index); jens@0: return true; jens@0: } jens@0: } jens@0: } jens@0: _iter = NULL; jens@0: return false; jens@0: } jens@0: jens@0: }