src/VersionDictionary.cpp
author jbm@humility
Mon Sep 28 23:39:08 2009 -0700 (2009-09-28)
changeset 8 21a6c17f4e3e
parent 5 4c10b7956435
child 9 629f61203db1
permissions -rw-r--r--
jbm: it compiles... but bombs in unit tests
     1 /*
     2  *  VersionDictionary.cpp
     3  *  Ottoman
     4  *
     5  *  Created by Jens Alfke on 8/21/09.
     6  *  Copyright 2009 Jens Alfke. All rights reserved.
     7  *  BSD-Licensed: See the file "LICENSE.txt" for details.
     8  */
     9 
    10 #include "VersionDictionary.h"
    11 #include "Chunk.h"
    12 #include "Index.h"
    13 #include "File.h"
    14 #include "Chunk.h"
    15 #include <assert.h>
    16 #include <errno.h>
    17 #include <algorithm>
    18 #include <time.h>
    19 
    20 # define UINT32_MAX             (4294967295U)
    21 
    22 
    23 namespace Mooseyard {
    24             
    25     // Trailer is stored in file; all numbers are little-endian.
    26     class VersionDictionary::Trailer :public Chunk {
    27     public:
    28         LittleEndian<uint32_t>          magicNumber1;
    29         LittleEndian<uint32_t>          count;
    30         VersionDictionary::IndexPositions  indexPositions;
    31         LittleEndian<FilePosition>      previousTrailerPosition;
    32         LittleEndian<uint32_t>          generation;
    33         LittleEndianDouble              timestamp;
    34         LittleEndian<uint32_t>          magicNumber2;
    35                 
    36         Trailer ()
    37             :Chunk(sizeof(Trailer), kChunkType)
    38         { }
    39         
    40         Trailer (int c,
    41                      VersionDictionary::IndexPositions indexPos, 
    42                      FilePosition previousTrailerPos,
    43                      uint32_t gen)
    44             :Chunk(sizeof(Trailer), kChunkType),
    45              magicNumber1(kMagicNumber1),
    46              count(c),
    47              indexPositions(indexPos),
    48              previousTrailerPosition(previousTrailerPos),
    49              generation(gen),
    50              timestamp(::time(NULL)),
    51              magicNumber2(kMagicNumber2)
    52         { }
    53         
    54         static const uint32_t kMagicNumber1 = 0xfe83b1cd;
    55         static const uint32_t kMagicNumber2 = 0xff84b2c1;
    56     };
    57     
    58     
    59     
    60     VersionDictionary::VersionDictionary (File *file)
    61         :_file(file),
    62          _trailerPosition(0),
    63          _previousTrailerPosition(0),
    64          _indexPositions(),
    65          _count(0),
    66          _previousVersion()
    67     {
    68         if (file->length() > sizeof(Trailer))
    69             _read();
    70     }
    71     
    72     VersionDictionary::VersionDictionary (File *file, FilePosition trailerPosition)
    73         :_file(file),
    74          _trailerPosition(trailerPosition),
    75          _previousTrailerPosition(0),
    76          _indexPositions(),
    77          _count(0),
    78          _previousVersion()
    79     {
    80         _read(trailerPosition);
    81     }
    82     
    83     int VersionDictionary::count() const {
    84         return _count;
    85     }
    86     
    87     const VersionDictionary::Trailer* VersionDictionary::_trailer() const {
    88         if (_trailerPosition > 0)
    89             return (const VersionDictionary::Trailer*) _file->mappedPosition(_trailerPosition);
    90         else 
    91             return NULL;      // Only happens in the empty-file case
    92     }
    93     
    94     const Index* VersionDictionary::_index (int i) const {
    95         if (_indexPositions[i] > 0)
    96             return (const Index*) _file->mappedPosition(_indexPositions[i]);
    97         else
    98             return NULL;
    99     }
   100     
   101     int VersionDictionary::generation() const {
   102         const VersionDictionary::Trailer *trailer = _trailer();
   103         return trailer ? (int)trailer->generation : -1;
   104     }
   105     
   106     time_t VersionDictionary::timestamp() const {
   107         const VersionDictionary::Trailer *trailer = _trailer();
   108         return trailer ? (time_t)trailer->timestamp : 0;
   109     }
   110     
   111     const VersionDictionary* VersionDictionary::previousVersion() const {
   112         if (!_previousVersion)
   113             if (_previousTrailerPosition > 0)
   114                 _previousVersion = new VersionDictionary(_file, _previousTrailerPosition);
   115         return _previousVersion;
   116     }
   117         
   118     
   119     Blob VersionDictionary::get (Key key) const {
   120         const Index *index = _index(key.hash >> 24);
   121         return index ?index->get(_file, key) :Blob();
   122     }
   123     
   124     void VersionDictionary::_read (FilePosition trailerPos) {
   125         assert(_file);
   126         
   127         if (trailerPos > 0) {
   128             _file->setPosition(trailerPos);
   129         } else {
   130             // Determine position of trailer, at EOF:
   131             off_t pos = _file->setPositionToEnd(sizeof(VersionDictionary::Trailer));
   132             if (pos < 0 || (pos & 0x03) || pos > UINT32_MAX)
   133                 throw File::Error(ERANGE, "No trailer found in file (wrong EOF)");
   134             trailerPos = (FilePosition)pos;
   135         }
   136         
   137         // Read & verify trailer:
   138         VersionDictionary::Trailer trailer;
   139         _file->read(trailer);
   140         _trailerPosition = trailerPos;
   141         _previousTrailerPosition = trailer.previousTrailerPosition;
   142         _count = trailer.count;
   143         _indexPositions = trailer.indexPositions;
   144         
   145         if (trailer.magicNumber1 != VersionDictionary::Trailer::kMagicNumber1
   146             || trailer.magicNumber2 != VersionDictionary::Trailer::kMagicNumber2)
   147             throw File::Error("No trailer found in file (invalid magic numbers)");
   148         if (_previousTrailerPosition >= _trailerPosition)
   149             throw File::Error("Bad VersionDictionary trailer (illegal previousTrailerPosition)");
   150         
   151         // Map in the file:
   152         _file->mapRegion(0, _trailerPosition+sizeof(trailer));
   153 
   154         // Verify Indexes:
   155         for (int i=0; i<256; i++) {
   156             if (_indexPositions[i] > 0)
   157                 if (_indexPositions[i] < _previousTrailerPosition || _indexPositions[i] >= _trailerPosition)
   158                     throw File::Error("Bad VersionDictionary trailer (illegal index position)");
   159             const Index *index = _index(i);
   160             if (index)
   161                 index->validate();
   162         }
   163 }
   164     
   165     
   166     bool VersionDictionary::isLatestVersion() const {
   167         return _file->length() <= _trailerPosition+sizeof(VersionDictionary::Trailer);
   168     }
   169     
   170     
   171     /*  Append addDict to baseDict, writing the results into dstFile (which is usually the same
   172         as baseDict->file().) If 'replace' is true, ignore the old contents of baseDict.
   173         Returns the position of the new trailer. */
   174     FilePosition VersionDictionary::_append (const VersionDictionary *baseDict, 
   175                                           const Dictionary *addDict, 
   176                                           File *dstFile, 
   177                                           bool replace) 
   178     {
   179         File *srcFile = baseDict->_file;
   180         bool incremental = !replace && dstFile==srcFile;
   181         Index* newIndex[256];
   182 
   183         {
   184             // Work out the needed capacity for each Index bucket:
   185             int newCounts[256] = {0};
   186             if (!replace) {
   187                 for (int i=0; i<256; i++) {
   188                     const Index *oldIndex = baseDict->_index(i);
   189                     if (oldIndex)
   190                         newCounts[i] = oldIndex->count();
   191                 }
   192             }
   193             Dictionary::Iterator *it = addDict->iterate();
   194             for (; *it; ++*it)
   195                 newCounts[it->key().hash >> 24]++;
   196             delete it;
   197             
   198             // Allocate new Indexes, of sufficient capacity, for each growing bucket:
   199             for (int i=0; i<256; i++) {
   200                 const Index *oldIndex = baseDict->_index(i);
   201                 if (newCounts[i] && (!incremental || !oldIndex || newCounts[i] > oldIndex->count())) {
   202                     newIndex[i] = Index::create(newCounts[i]);
   203                     if (incremental && oldIndex)
   204                         newIndex[i]->copyFrom(srcFile, oldIndex);
   205                 } else
   206                     newIndex[i] = NULL;
   207             }
   208         }
   209         
   210         // Lock the file now, seek to the end, and make sure it's been prepared with a header,
   211         // since FilePositions of 0 and 1 are reserved.
   212         File::Lock lock(dstFile);
   213         const FilePosition startPos = (FilePosition) dstFile->setPositionToEnd();
   214         if (startPos <= 1)
   215             throw File::Error(ERANGE, "Cannot write VersionDictionary to empty file");
   216         
   217         // For safety's sake, make sure the old file hasn't been destroyed:
   218         FilePosition oldEOF = baseDict->_trailerPosition;
   219         if (oldEOF > 0)
   220             oldEOF += sizeof(VersionDictionary::Trailer);
   221         if (srcFile->length() < oldEOF)
   222             throw File::Error(ERANGE, "File has been truncated since being read");
   223             
   224         try{
   225             FilePosition pos = startPos;
   226             
   227             int newCount;
   228             if (replace)
   229                 newCount = 0;
   230             else if (dstFile == srcFile)
   231                 newCount = baseDict->_count;
   232             else {
   233                 // Write out the surviving pre-existing entries from the old file:
   234                 newCount = 0;
   235                 for (VersionDictionary::Iterator it(baseDict); it; ++it) {
   236                     Key key = it.key();
   237                     if (!addDict->contains(key)) {
   238                         int bucket = key.hash >> 24;
   239                         int size = KeyValueChunk::write(dstFile,pos, key, it.value());
   240                         newIndex[bucket]->put(srcFile, key, pos, startPos);
   241                         pos += size;
   242                         newCount++;
   243                     }
   244                 }
   245             }
   246             
   247             // Now write the items from the new dict:
   248             Dictionary::Iterator *it = addDict->iterate();
   249             for (; *it; ++*it) {
   250                 Key key=it->key();
   251                 Blob value=it->value();
   252                 int bucket = key.hash >> 24;
   253                 Index *index = newIndex[bucket];
   254                 assert(index);
   255                                               
   256                 if (value.bytes) {
   257                     int size = KeyValueChunk::write(dstFile,pos, key, value);
   258                     if (index->put(srcFile, key, pos, startPos))
   259                         newCount++;
   260                     pos += size;
   261                 } else if (incremental) {
   262                     // NULL value is a deleted-entry marker used by OverlayDictionary
   263                     if (index->remove(srcFile, key, startPos))
   264                         newCount--;
   265                 }
   266             }
   267             delete it;
   268             
   269             // Write out the new indexes:
   270             IndexPositions newIndexPositions;
   271             if (incremental)
   272                 newIndexPositions = baseDict->_indexPositions;
   273             else
   274                 memset(&newIndexPositions, 0, sizeof(newIndexPositions));
   275 
   276             pos += Chunk::writePadding(dstFile);
   277             for (int i=0; i<256; i++) {
   278                 if (newIndex[i]) {
   279                     Index *index = newIndex[i];
   280                     newIndexPositions[i] = pos;
   281                     pos += dstFile->write(index, index->size());
   282                     delete index;
   283                 }
   284             }
   285             
   286             // Flush everything out to disk, with maximum paranoia, before writing the trailer.
   287             // Since scavenging corrupt files looks for trailers, we don't want to append a
   288             // trailer until we're certain that all of the real data is safely on-disk.
   289             dstFile->flushDisk();
   290             
   291             // Write the trailer:
   292             FilePosition newTrailerPosition = pos;
   293             VersionDictionary::Trailer trailer(newCount,
   294                                             newIndexPositions,
   295                                             baseDict->_trailerPosition,
   296                                             baseDict->generation() + 1);
   297             pos += dstFile->write(trailer);
   298             
   299             // Just a mild flush here; flushDisk() is very expensive and can be disruptive to
   300             // real-time I/O in other apps, so it's bad to call it too often.
   301             dstFile->flush();
   302             assert(pos==dstFile->position());
   303             
   304             return newTrailerPosition;
   305             
   306         } catch (...) {
   307             // If something goes wrong, try to back out everything we wrote:
   308             try{
   309                 dstFile->setLength(startPos);
   310             }catch(...) { }
   311             throw;
   312         }
   313     }
   314  
   315     
   316 #pragma mark -
   317 #pragma mark TESTING-ONLY:
   318     
   319     VersionDictionary* VersionDictionary::_appendAndOpen (const Dictionary *addDict,
   320                                                     File *dstFile, 
   321                                                     bool replace) const
   322     {
   323         FilePosition nextVersionPos = _append(this, addDict, dstFile, replace);
   324         VersionDictionary *nextVersion = new VersionDictionary(dstFile, nextVersionPos);
   325         nextVersion->_previousVersion = this;
   326         return nextVersion;
   327     }
   328     
   329     VersionDictionary* VersionDictionary::create (File *file, const Dictionary *srcDict) {
   330         return VersionDictionary(file)._appendAndOpen(srcDict, file, true);
   331     }
   332     
   333     
   334 #pragma mark -
   335 #pragma mark ITERATOR:
   336     
   337     Dictionary::Iterator* VersionDictionary::iterate() const {
   338         return new VersionDictionary::Iterator(this);
   339     }
   340     
   341     VersionDictionary::Iterator::Iterator (const VersionDictionary *file)
   342     :_file(file),
   343      _bucket(-1),
   344      _iter(NULL)
   345     { 
   346         nextIndex();
   347     }
   348     
   349     VersionDictionary::Iterator::~Iterator() {
   350         delete _iter;
   351     }
   352     
   353     bool VersionDictionary::Iterator::hasValue() const               {return _iter && _iter->hasValue();}
   354     Key VersionDictionary::Iterator::key() const                     {return _iter->key();}
   355     Blob VersionDictionary::Iterator::value() const                  {return _iter->value();}
   356     
   357     bool VersionDictionary::Iterator::next() {
   358         if (_iter->next())
   359             return true;
   360         else {
   361             delete _iter;
   362             return nextIndex();
   363         }
   364     }
   365     
   366     bool VersionDictionary::Iterator::nextIndex() {
   367         while (++_bucket < 256) {
   368             const Index *index = _file->_index(_bucket);
   369             if (index) {
   370                 _iter = new Index::Iterator(_file->_file, index);
   371                 if (*_iter)
   372                     return true;
   373                 else
   374                     delete _iter;
   375             }
   376         }
   377         _iter = NULL;
   378         return false;
   379     }
   380     
   381     
   382 #pragma mark -
   383 #pragma mark CHANGE ITERATOR:
   384     
   385     VersionDictionary::ChangeIterator* VersionDictionary::iterateChanges() const {
   386         return new VersionDictionary::ChangeIterator(this);
   387     }
   388     
   389     VersionDictionary::ChangeIterator::ChangeIterator (const VersionDictionary *file)
   390     :_file(file),
   391      _bucket(-1),
   392      _iter(NULL)
   393     { 
   394         next();            
   395     }
   396     
   397     VersionDictionary::ChangeIterator::~ChangeIterator() {
   398         delete _iter;
   399     }
   400     
   401     bool VersionDictionary::ChangeIterator::hasValue() const               {return _iter && _iter->hasValue();}
   402     Key VersionDictionary::ChangeIterator::key() const                     {return _iter->key();}
   403     Blob VersionDictionary::ChangeIterator::value() const                  {return _iter->value();}
   404     
   405     bool VersionDictionary::ChangeIterator::next() {
   406         const VersionDictionary::Trailer *trailer = _file->_trailer();
   407         for (;;) {
   408             // Check if current iterator has a value that's from this version:
   409             if (_iter && _iter->hasValue()) {
   410                 if (((Index::Iterator*)_iter)->valuePosition() > trailer->previousTrailerPosition)
   411                     return true;
   412                 else if (_iter->next())
   413                     continue;
   414             }
   415             // If not, go to next Index:
   416             if (!nextIndex())
   417                 return false;
   418         }
   419     }
   420     
   421     bool VersionDictionary::ChangeIterator::nextIndex() {
   422         delete _iter;
   423         const VersionDictionary::Trailer *trailer = _file->_trailer();
   424         while (++_bucket < 256) {
   425             const Index *index = _file->_index(_bucket);
   426             if (index) {
   427                 // Skip indexes that weren't updated in this version:
   428                 if (trailer->indexPositions[_bucket] > trailer->previousTrailerPosition) {
   429                     _iter = new Index::Iterator(_file->_file, index);
   430                     return true;
   431                 }
   432             }
   433         }
   434         _iter = NULL;
   435         return false;
   436     }
   437     
   438 }