src/VersionDictionary.cpp
author Jens Alfke <jens@mooseyard.com>
Sun Sep 20 15:14:12 2009 -0700 (2009-09-20)
changeset 0 31a43d94cc26
child 2 851de24ecb61
permissions -rw-r--r--
First official checkin.
     1 /*
     2  *  VersionDictionary.cpp
     3  *  Ottoman
     4  *
     5  *  Created by Jens Alfke on 8/21/09.
     6  *  Copyright 2009 Jens Alfke. All rights reserved.
     7  *  BSD-Licensed: See the file "LICENSE.txt" for details.
     8  */
     9 
    10 #include "VersionDictionary.h"
    11 #include "Chunk.h"
    12 #include "Index.h"
    13 #include "File.h"
    14 #include "Chunk.h"
    15 #include <assert.h>
    16 #include <errno.h>
    17 #include <algorithm>
    18 
    19 namespace Mooseyard {
    20             
    21     // Trailer is stored in file; all numbers are little-endian.
    22     class VersionDictionary::Trailer :public Chunk {
    23     public:
    24         LittleEndian<uint32_t>          magicNumber1;
    25         LittleEndian<uint32_t>          count;
    26         VersionDictionary::IndexPositions  indexPositions;
    27         LittleEndian<FilePosition>      previousTrailerPosition;
    28         LittleEndian<uint32_t>          generation;
    29         LittleEndianDouble              timestamp;
    30         LittleEndian<uint32_t>          magicNumber2;
    31                 
    32         Trailer ()
    33             :Chunk(sizeof(Trailer), kChunkType)
    34         { }
    35         
    36         Trailer (int c,
    37                      VersionDictionary::IndexPositions indexPos, 
    38                      FilePosition previousTrailerPos,
    39                      uint32_t gen)
    40             :Chunk(sizeof(Trailer), kChunkType),
    41              magicNumber1(kMagicNumber1),
    42              count(c),
    43              indexPositions(indexPos),
    44              previousTrailerPosition(previousTrailerPos),
    45              generation(gen),
    46              timestamp(::time(NULL)),
    47              magicNumber2(kMagicNumber2)
    48         { }
    49         
    50         static const uint32_t kMagicNumber1 = 0xfe83b1cd;
    51         static const uint32_t kMagicNumber2 = 0xff84b2c1;
    52     };
    53     
    54     
    55     
    56     VersionDictionary::VersionDictionary (File *file)
    57         :_file(file),
    58          _trailerPosition(0),
    59          _previousTrailerPosition(0),
    60          _indexPositions(),
    61          _count(0),
    62          _previousVersion()
    63     {
    64         if (file->length() > sizeof(Trailer))
    65             _read();
    66     }
    67     
    68     VersionDictionary::VersionDictionary (File *file, FilePosition trailerPosition)
    69         :_file(file),
    70          _trailerPosition(trailerPosition),
    71          _previousTrailerPosition(0),
    72          _indexPositions(),
    73          _count(0),
    74          _previousVersion()
    75     {
    76         _read(trailerPosition);
    77     }
    78     
    79     int VersionDictionary::count() const {
    80         return _count;
    81     }
    82     
    83     const VersionDictionary::Trailer* VersionDictionary::_trailer() const {
    84         if (_trailerPosition > 0)
    85             return (const VersionDictionary::Trailer*) _file->mappedPosition(_trailerPosition);
    86         else 
    87             return NULL;      // Only happens in the empty-file case
    88     }
    89     
    90     const Index* VersionDictionary::_index (int i) const {
    91         if (_indexPositions[i] > 0)
    92             return (const Index*) _file->mappedPosition(_indexPositions[i]);
    93         else
    94             return NULL;
    95     }
    96     
    97     int VersionDictionary::generation() const {
    98         const VersionDictionary::Trailer *trailer = _trailer();
    99         return trailer ? (int)trailer->generation : -1;
   100     }
   101     
   102     time_t VersionDictionary::timestamp() const {
   103         const VersionDictionary::Trailer *trailer = _trailer();
   104         return trailer ? (time_t)trailer->timestamp : 0;
   105     }
   106     
   107     const VersionDictionary* VersionDictionary::previousVersion() const {
   108         if (!_previousVersion)
   109             if (_previousTrailerPosition > 0)
   110                 _previousVersion = new VersionDictionary(_file, _previousTrailerPosition);
   111         return _previousVersion;
   112     }
   113         
   114     
   115     Blob VersionDictionary::get (Key key) const {
   116         const Index *index = _index(key.hash >> 24);
   117         return index ?index->get(_file, key) :Blob();
   118     }
   119     
   120     void VersionDictionary::_read (FilePosition trailerPos) {
   121         assert(_file);
   122         
   123         if (trailerPos > 0) {
   124             _file->setPosition(trailerPos);
   125         } else {
   126             // Determine position of trailer, at EOF:
   127             off_t pos = _file->setPositionToEnd(sizeof(VersionDictionary::Trailer));
   128             if (pos < 0 || (pos & 0x03) || pos > UINT32_MAX)
   129                 throw File::Error(ERANGE, "No trailer found in file (wrong EOF)");
   130             trailerPos = pos;
   131         }
   132         
   133         // Read & verify trailer:
   134         VersionDictionary::Trailer trailer;
   135         _file->read(trailer);
   136         _trailerPosition = trailerPos;
   137         _count = trailer.count;
   138         _indexPositions = trailer.indexPositions;
   139         
   140         if (trailer.magicNumber1 != VersionDictionary::Trailer::kMagicNumber1
   141             || trailer.magicNumber2 != VersionDictionary::Trailer::kMagicNumber2)
   142             throw File::Error(ERANGE, "No trailer found in file (invalid magic numbers)");\
   143 
   144         
   145         // Map in the file:
   146         _file->mapRegion(0, _trailerPosition+sizeof(trailer));
   147 
   148         // Verify Indexes:
   149         for (int i=0; i<256; i++) {
   150             const Index *index = _index(i);
   151             if (index)
   152                 index->validate();
   153         }
   154 }
   155     
   156     
   157     bool VersionDictionary::isLatestVersion() const {
   158         return _file->length() <= _trailerPosition+sizeof(VersionDictionary::Trailer);
   159     }
   160     
   161     
   162     /*  Append addDict to baseDict, writing the results into dstFile (which is usually the same
   163         as baseDict->file().) If 'replace' is true, ignore the old contents of baseDict.
   164         Returns the position of the new trailer. */
   165     FilePosition VersionDictionary::_append (const VersionDictionary *baseDict, 
   166                                           const Dictionary *addDict, 
   167                                           File *dstFile, 
   168                                           bool replace) 
   169     {
   170         File *srcFile = baseDict->_file;
   171         bool incremental = !replace && dstFile==srcFile;
   172         Index* newIndex[256];
   173 
   174         {
   175             // Work out the needed capacity for each Index bucket:
   176             int newCounts[256] = {0};
   177             if (!replace) {
   178                 for (int i=0; i<256; i++) {
   179                     const Index *oldIndex = baseDict->_index(i);
   180                     if (oldIndex)
   181                         newCounts[i] = oldIndex->count();
   182                 }
   183             }
   184             Dictionary::Iterator *it = addDict->iterate();
   185             for (; *it; ++*it)
   186                 newCounts[it->key().hash >> 24]++;
   187             delete it;
   188             
   189             // Allocate new Indexes, of sufficient capacity, for each growing bucket:
   190             for (int i=0; i<256; i++) {
   191                 const Index *oldIndex = baseDict->_index(i);
   192                 if (newCounts[i] && (!incremental || !oldIndex || newCounts[i] > oldIndex->count())) {
   193                     newIndex[i] = Index::create(newCounts[i]);
   194                     if (incremental && oldIndex)
   195                         newIndex[i]->copyFrom(srcFile, oldIndex);
   196                 } else
   197                     newIndex[i] = NULL;
   198             }
   199         }
   200         
   201         // Lock the file now, seek to the end, and make sure it's been prepared with a header,
   202         // since FilePositions of 0 and 1 are reserved.
   203         File::Lock lock(dstFile);
   204         const FilePosition startPos = dstFile->setPositionToEnd();
   205         if (startPos <= 1)
   206             throw File::Error(ERANGE, "Cannot write VersionDictionary to empty file");
   207         
   208         // For safety's sake, make sure the old file hasn't been destroyed:
   209         FilePosition oldEOF = baseDict->_trailerPosition;
   210         if (oldEOF > 0)
   211             oldEOF += sizeof(VersionDictionary::Trailer);
   212         if (srcFile->length() < oldEOF)
   213             throw File::Error(ERANGE, "File has been truncated since being read");
   214             
   215         try{
   216             FilePosition pos = startPos;
   217             
   218             int newCount;
   219             if (replace)
   220                 newCount = 0;
   221             else if (dstFile == srcFile)
   222                 newCount = baseDict->_count;
   223             else {
   224                 // Write out the surviving pre-existing entries from the old file:
   225                 newCount = 0;
   226                 for (VersionDictionary::Iterator it(baseDict); it; ++it) {
   227                     Key key = it.key();
   228                     if (!addDict->contains(key)) {
   229                         int bucket = key.hash >> 24;
   230                         int size = KeyValueChunk::write(dstFile,pos, key, it.value());
   231                         newIndex[bucket]->put(srcFile, key, pos, startPos);
   232                         pos += size;
   233                         newCount++;
   234                     }
   235                 }
   236             }
   237             
   238             // Now write the items from the new dict:
   239             Dictionary::Iterator *it = addDict->iterate();
   240             for (; *it; ++*it) {
   241                 Key key=it->key();
   242                 Blob value=it->value();
   243                 int bucket = key.hash >> 24;
   244                 Index *index = newIndex[bucket];
   245                 assert(index);
   246                                               
   247                 if (value.bytes) {
   248                     int size = KeyValueChunk::write(dstFile,pos, key, value);
   249                     if (index->put(srcFile, key, pos, startPos))
   250                         newCount++;
   251                     pos += size;
   252                 } else if (incremental) {
   253                     // NULL value is a deleted-entry marker used by OverlayDictionary
   254                     if (index->remove(srcFile, key, startPos))
   255                         newCount--;
   256                 }
   257             }
   258             delete it;
   259             
   260             // Write out the new indexes:
   261             IndexPositions newIndexPositions;
   262             if (incremental)
   263                 newIndexPositions = baseDict->_indexPositions;
   264             else
   265                 memset(&newIndexPositions, 0, sizeof(newIndexPositions));
   266 
   267             pos += Chunk::writePadding(dstFile);
   268             for (int i=0; i<256; i++) {
   269                 if (newIndex[i]) {
   270                     Index *index = newIndex[i];
   271                     newIndexPositions[i] = pos;
   272                     pos += dstFile->write(index, index->size());
   273                     delete index;
   274                 }
   275             }
   276             
   277             // Flush everything out to disk, with maximum paranoia, before writing the trailer.
   278             // Since scavenging corrupt files looks for trailers, we don't want to append a
   279             // trailer until we're certain that all of the real data is safely on-disk.
   280             dstFile->flushDisk();
   281             
   282             // Write the trailer:
   283             FilePosition newTrailerPosition = pos;
   284             VersionDictionary::Trailer trailer(newCount,
   285                                             newIndexPositions,
   286                                             baseDict->_trailerPosition,
   287                                             baseDict->generation() + 1);
   288             pos += dstFile->write(trailer);
   289             
   290             // Just a mild flush here; flushDisk() is very expensive and can be disruptive to
   291             // real-time I/O in other apps, so it's bad to call it too often.
   292             dstFile->flush();
   293             assert(pos==dstFile->position());
   294             
   295             return newTrailerPosition;
   296             
   297         } catch (...) {
   298             // If something goes wrong, try to back out everything we wrote:
   299             try{
   300                 dstFile->setLength(startPos);
   301             }catch(...) { }
   302             throw;
   303         }
   304     }
   305  
   306     
   307 #pragma mark -
   308 #pragma mark TESTING-ONLY:
   309     
   310     VersionDictionary* VersionDictionary::_appendAndOpen (const Dictionary *addDict,
   311                                                     File *dstFile, 
   312                                                     bool replace) const
   313     {
   314         FilePosition nextVersionPos = _append(this, addDict, dstFile, replace);
   315         VersionDictionary *nextVersion = new VersionDictionary(dstFile, nextVersionPos);
   316         nextVersion->_previousVersion = this;
   317         return nextVersion;
   318     }
   319     
   320     VersionDictionary* VersionDictionary::create (File *file, const Dictionary *srcDict) {
   321         return VersionDictionary(file)._appendAndOpen(srcDict, file, true);
   322     }
   323     
   324     
   325 #pragma mark -
   326 #pragma mark ITERATOR:
   327     
   328     Dictionary::Iterator* VersionDictionary::iterate() const {
   329         return new VersionDictionary::Iterator(this);
   330     }
   331     
   332     VersionDictionary::Iterator::Iterator (const VersionDictionary *file)
   333     :_file(file),
   334      _bucket(-1),
   335      _iter(NULL)
   336     { 
   337         nextIndex();
   338     }
   339     
   340     VersionDictionary::Iterator::~Iterator() {
   341         delete _iter;
   342     }
   343     
   344     bool VersionDictionary::Iterator::hasValue() const               {return _iter && _iter->hasValue();}
   345     Key VersionDictionary::Iterator::key() const                     {return _iter->key();}
   346     Blob VersionDictionary::Iterator::value() const                  {return _iter->value();}
   347     
   348     bool VersionDictionary::Iterator::next() {
   349         if (_iter->next())
   350             return true;
   351         else {
   352             delete _iter;
   353             return nextIndex();
   354         }
   355     }
   356     
   357     bool VersionDictionary::Iterator::nextIndex() {
   358         while (++_bucket < 256) {
   359             const Index *index = _file->_index(_bucket);
   360             if (index) {
   361                 _iter = new Index::Iterator(_file->_file, index);
   362                 if (*_iter)
   363                     return true;
   364                 else
   365                     delete _iter;
   366             }
   367         }
   368         _iter = NULL;
   369         return false;
   370     }
   371     
   372     
   373 #pragma mark -
   374 #pragma mark CHANGE ITERATOR:
   375     
   376     VersionDictionary::ChangeIterator* VersionDictionary::iterateChanges() const {
   377         return new VersionDictionary::ChangeIterator(this);
   378     }
   379     
   380     VersionDictionary::ChangeIterator::ChangeIterator (const VersionDictionary *file)
   381     :_file(file),
   382      _bucket(-1),
   383      _iter(NULL)
   384     { 
   385         next();            
   386     }
   387     
   388     VersionDictionary::ChangeIterator::~ChangeIterator() {
   389         delete _iter;
   390     }
   391     
   392     bool VersionDictionary::ChangeIterator::hasValue() const               {return _iter && _iter->hasValue();}
   393     Key VersionDictionary::ChangeIterator::key() const                     {return _iter->key();}
   394     Blob VersionDictionary::ChangeIterator::value() const                  {return _iter->value();}
   395     
   396     bool VersionDictionary::ChangeIterator::next() {
   397         const VersionDictionary::Trailer *trailer = _file->_trailer();
   398         for (;;) {
   399             // Check if current iterator has a value that's from this version:
   400             if (_iter && _iter->hasValue()) {
   401                 if (((Index::Iterator*)_iter)->valuePosition() > trailer->previousTrailerPosition)
   402                     return true;
   403                 else if (_iter->next())
   404                     continue;
   405             }
   406             // If not, go to next Index:
   407             if (!nextIndex())
   408                 return false;
   409         }
   410     }
   411     
   412     bool VersionDictionary::ChangeIterator::nextIndex() {
   413         delete _iter;
   414         const VersionDictionary::Trailer *trailer = _file->_trailer();
   415         while (++_bucket < 256) {
   416             const Index *index = _file->_index(_bucket);
   417             if (index) {
   418                 // Skip indexes that weren't updated in this version:
   419                 if (trailer->indexPositions[_bucket] > trailer->previousTrailerPosition) {
   420                     _iter = new Index::Iterator(_file->_file, index);
   421                     return true;
   422                 }
   423             }
   424         }
   425         _iter = NULL;
   426         return false;
   427     }
   428     
   429 }