diff -r 000000000000 -r 31a43d94cc26 src/VersionDictionary.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/VersionDictionary.cpp Sun Sep 20 15:14:12 2009 -0700 @@ -0,0 +1,429 @@ +/* + * VersionDictionary.cpp + * Ottoman + * + * Created by Jens Alfke on 8/21/09. + * Copyright 2009 Jens Alfke. All rights reserved. + * BSD-Licensed: See the file "LICENSE.txt" for details. + */ + +#include "VersionDictionary.h" +#include "Chunk.h" +#include "Index.h" +#include "File.h" +#include "Chunk.h" +#include +#include +#include + +namespace Mooseyard { + + // Trailer is stored in file; all numbers are little-endian. + class VersionDictionary::Trailer :public Chunk { + public: + LittleEndian magicNumber1; + LittleEndian count; + VersionDictionary::IndexPositions indexPositions; + LittleEndian previousTrailerPosition; + LittleEndian generation; + LittleEndianDouble timestamp; + LittleEndian magicNumber2; + + Trailer () + :Chunk(sizeof(Trailer), kChunkType) + { } + + Trailer (int c, + VersionDictionary::IndexPositions indexPos, + FilePosition previousTrailerPos, + uint32_t gen) + :Chunk(sizeof(Trailer), kChunkType), + magicNumber1(kMagicNumber1), + count(c), + indexPositions(indexPos), + previousTrailerPosition(previousTrailerPos), + generation(gen), + timestamp(::time(NULL)), + magicNumber2(kMagicNumber2) + { } + + static const uint32_t kMagicNumber1 = 0xfe83b1cd; + static const uint32_t kMagicNumber2 = 0xff84b2c1; + }; + + + + VersionDictionary::VersionDictionary (File *file) + :_file(file), + _trailerPosition(0), + _previousTrailerPosition(0), + _indexPositions(), + _count(0), + _previousVersion() + { + if (file->length() > sizeof(Trailer)) + _read(); + } + + VersionDictionary::VersionDictionary (File *file, FilePosition trailerPosition) + :_file(file), + _trailerPosition(trailerPosition), + _previousTrailerPosition(0), + _indexPositions(), + _count(0), + _previousVersion() + { + _read(trailerPosition); + } + + int VersionDictionary::count() const { + return _count; + } + + const VersionDictionary::Trailer* VersionDictionary::_trailer() const { + if (_trailerPosition > 0) + return (const VersionDictionary::Trailer*) _file->mappedPosition(_trailerPosition); + else + return NULL; // Only happens in the empty-file case + } + + const Index* VersionDictionary::_index (int i) const { + if (_indexPositions[i] > 0) + return (const Index*) _file->mappedPosition(_indexPositions[i]); + else + return NULL; + } + + int VersionDictionary::generation() const { + const VersionDictionary::Trailer *trailer = _trailer(); + return trailer ? (int)trailer->generation : -1; + } + + time_t VersionDictionary::timestamp() const { + const VersionDictionary::Trailer *trailer = _trailer(); + return trailer ? (time_t)trailer->timestamp : 0; + } + + const VersionDictionary* VersionDictionary::previousVersion() const { + if (!_previousVersion) + if (_previousTrailerPosition > 0) + _previousVersion = new VersionDictionary(_file, _previousTrailerPosition); + return _previousVersion; + } + + + Blob VersionDictionary::get (Key key) const { + const Index *index = _index(key.hash >> 24); + return index ?index->get(_file, key) :Blob(); + } + + void VersionDictionary::_read (FilePosition trailerPos) { + assert(_file); + + if (trailerPos > 0) { + _file->setPosition(trailerPos); + } else { + // Determine position of trailer, at EOF: + off_t pos = _file->setPositionToEnd(sizeof(VersionDictionary::Trailer)); + if (pos < 0 || (pos & 0x03) || pos > UINT32_MAX) + throw File::Error(ERANGE, "No trailer found in file (wrong EOF)"); + trailerPos = pos; + } + + // Read & verify trailer: + VersionDictionary::Trailer trailer; + _file->read(trailer); + _trailerPosition = trailerPos; + _count = trailer.count; + _indexPositions = trailer.indexPositions; + + if (trailer.magicNumber1 != VersionDictionary::Trailer::kMagicNumber1 + || trailer.magicNumber2 != VersionDictionary::Trailer::kMagicNumber2) + throw File::Error(ERANGE, "No trailer found in file (invalid magic numbers)");\ + + + // Map in the file: + _file->mapRegion(0, _trailerPosition+sizeof(trailer)); + + // Verify Indexes: + for (int i=0; i<256; i++) { + const Index *index = _index(i); + if (index) + index->validate(); + } +} + + + bool VersionDictionary::isLatestVersion() const { + return _file->length() <= _trailerPosition+sizeof(VersionDictionary::Trailer); + } + + + /* Append addDict to baseDict, writing the results into dstFile (which is usually the same + as baseDict->file().) If 'replace' is true, ignore the old contents of baseDict. + Returns the position of the new trailer. */ + FilePosition VersionDictionary::_append (const VersionDictionary *baseDict, + const Dictionary *addDict, + File *dstFile, + bool replace) + { + File *srcFile = baseDict->_file; + bool incremental = !replace && dstFile==srcFile; + Index* newIndex[256]; + + { + // Work out the needed capacity for each Index bucket: + int newCounts[256] = {0}; + if (!replace) { + for (int i=0; i<256; i++) { + const Index *oldIndex = baseDict->_index(i); + if (oldIndex) + newCounts[i] = oldIndex->count(); + } + } + Dictionary::Iterator *it = addDict->iterate(); + for (; *it; ++*it) + newCounts[it->key().hash >> 24]++; + delete it; + + // Allocate new Indexes, of sufficient capacity, for each growing bucket: + for (int i=0; i<256; i++) { + const Index *oldIndex = baseDict->_index(i); + if (newCounts[i] && (!incremental || !oldIndex || newCounts[i] > oldIndex->count())) { + newIndex[i] = Index::create(newCounts[i]); + if (incremental && oldIndex) + newIndex[i]->copyFrom(srcFile, oldIndex); + } else + newIndex[i] = NULL; + } + } + + // Lock the file now, seek to the end, and make sure it's been prepared with a header, + // since FilePositions of 0 and 1 are reserved. + File::Lock lock(dstFile); + const FilePosition startPos = dstFile->setPositionToEnd(); + if (startPos <= 1) + throw File::Error(ERANGE, "Cannot write VersionDictionary to empty file"); + + // For safety's sake, make sure the old file hasn't been destroyed: + FilePosition oldEOF = baseDict->_trailerPosition; + if (oldEOF > 0) + oldEOF += sizeof(VersionDictionary::Trailer); + if (srcFile->length() < oldEOF) + throw File::Error(ERANGE, "File has been truncated since being read"); + + try{ + FilePosition pos = startPos; + + int newCount; + if (replace) + newCount = 0; + else if (dstFile == srcFile) + newCount = baseDict->_count; + else { + // Write out the surviving pre-existing entries from the old file: + newCount = 0; + for (VersionDictionary::Iterator it(baseDict); it; ++it) { + Key key = it.key(); + if (!addDict->contains(key)) { + int bucket = key.hash >> 24; + int size = KeyValueChunk::write(dstFile,pos, key, it.value()); + newIndex[bucket]->put(srcFile, key, pos, startPos); + pos += size; + newCount++; + } + } + } + + // Now write the items from the new dict: + Dictionary::Iterator *it = addDict->iterate(); + for (; *it; ++*it) { + Key key=it->key(); + Blob value=it->value(); + int bucket = key.hash >> 24; + Index *index = newIndex[bucket]; + assert(index); + + if (value.bytes) { + int size = KeyValueChunk::write(dstFile,pos, key, value); + if (index->put(srcFile, key, pos, startPos)) + newCount++; + pos += size; + } else if (incremental) { + // NULL value is a deleted-entry marker used by OverlayDictionary + if (index->remove(srcFile, key, startPos)) + newCount--; + } + } + delete it; + + // Write out the new indexes: + IndexPositions newIndexPositions; + if (incremental) + newIndexPositions = baseDict->_indexPositions; + else + memset(&newIndexPositions, 0, sizeof(newIndexPositions)); + + pos += Chunk::writePadding(dstFile); + for (int i=0; i<256; i++) { + if (newIndex[i]) { + Index *index = newIndex[i]; + newIndexPositions[i] = pos; + pos += dstFile->write(index, index->size()); + delete index; + } + } + + // Flush everything out to disk, with maximum paranoia, before writing the trailer. + // Since scavenging corrupt files looks for trailers, we don't want to append a + // trailer until we're certain that all of the real data is safely on-disk. + dstFile->flushDisk(); + + // Write the trailer: + FilePosition newTrailerPosition = pos; + VersionDictionary::Trailer trailer(newCount, + newIndexPositions, + baseDict->_trailerPosition, + baseDict->generation() + 1); + pos += dstFile->write(trailer); + + // Just a mild flush here; flushDisk() is very expensive and can be disruptive to + // real-time I/O in other apps, so it's bad to call it too often. + dstFile->flush(); + assert(pos==dstFile->position()); + + return newTrailerPosition; + + } catch (...) { + // If something goes wrong, try to back out everything we wrote: + try{ + dstFile->setLength(startPos); + }catch(...) { } + throw; + } + } + + +#pragma mark - +#pragma mark TESTING-ONLY: + + VersionDictionary* VersionDictionary::_appendAndOpen (const Dictionary *addDict, + File *dstFile, + bool replace) const + { + FilePosition nextVersionPos = _append(this, addDict, dstFile, replace); + VersionDictionary *nextVersion = new VersionDictionary(dstFile, nextVersionPos); + nextVersion->_previousVersion = this; + return nextVersion; + } + + VersionDictionary* VersionDictionary::create (File *file, const Dictionary *srcDict) { + return VersionDictionary(file)._appendAndOpen(srcDict, file, true); + } + + +#pragma mark - +#pragma mark ITERATOR: + + Dictionary::Iterator* VersionDictionary::iterate() const { + return new VersionDictionary::Iterator(this); + } + + VersionDictionary::Iterator::Iterator (const VersionDictionary *file) + :_file(file), + _bucket(-1), + _iter(NULL) + { + nextIndex(); + } + + VersionDictionary::Iterator::~Iterator() { + delete _iter; + } + + bool VersionDictionary::Iterator::hasValue() const {return _iter && _iter->hasValue();} + Key VersionDictionary::Iterator::key() const {return _iter->key();} + Blob VersionDictionary::Iterator::value() const {return _iter->value();} + + bool VersionDictionary::Iterator::next() { + if (_iter->next()) + return true; + else { + delete _iter; + return nextIndex(); + } + } + + bool VersionDictionary::Iterator::nextIndex() { + while (++_bucket < 256) { + const Index *index = _file->_index(_bucket); + if (index) { + _iter = new Index::Iterator(_file->_file, index); + if (*_iter) + return true; + else + delete _iter; + } + } + _iter = NULL; + return false; + } + + +#pragma mark - +#pragma mark CHANGE ITERATOR: + + VersionDictionary::ChangeIterator* VersionDictionary::iterateChanges() const { + return new VersionDictionary::ChangeIterator(this); + } + + VersionDictionary::ChangeIterator::ChangeIterator (const VersionDictionary *file) + :_file(file), + _bucket(-1), + _iter(NULL) + { + next(); + } + + VersionDictionary::ChangeIterator::~ChangeIterator() { + delete _iter; + } + + bool VersionDictionary::ChangeIterator::hasValue() const {return _iter && _iter->hasValue();} + Key VersionDictionary::ChangeIterator::key() const {return _iter->key();} + Blob VersionDictionary::ChangeIterator::value() const {return _iter->value();} + + bool VersionDictionary::ChangeIterator::next() { + const VersionDictionary::Trailer *trailer = _file->_trailer(); + for (;;) { + // Check if current iterator has a value that's from this version: + if (_iter && _iter->hasValue()) { + if (((Index::Iterator*)_iter)->valuePosition() > trailer->previousTrailerPosition) + return true; + else if (_iter->next()) + continue; + } + // If not, go to next Index: + if (!nextIndex()) + return false; + } + } + + bool VersionDictionary::ChangeIterator::nextIndex() { + delete _iter; + const VersionDictionary::Trailer *trailer = _file->_trailer(); + while (++_bucket < 256) { + const Index *index = _file->_index(_bucket); + if (index) { + // Skip indexes that weren't updated in this version: + if (trailer->indexPositions[_bucket] > trailer->previousTrailerPosition) { + _iter = new Index::Iterator(_file->_file, index); + return true; + } + } + } + _iter = NULL; + return false; + } + +}