版本信息有什麼用?先來簡要說明三個類的具體用途:
有了上面的描述,再來看版本信息到底有什麼用呢?
如果還不能給出答案,將上述三個類當作一個整體,再來看Version類組到底包含了哪些信息:
關於版本信息到底有什麼用這個話題暫時先放一放,來看具體類。
VersionSet(維護了一份Version列表,包含當前Alive的所有Version信息,列表中第一個代表數據庫的當前版本)
VersionSet類只有一個實例,在DBImpl(數據庫實現類)類中,維護所有活動的Version對象,來看VersionSet的所有語境:
1. 數據庫啟動時:通過Current文件加載Manifset文件,讀取Manifest文件完成版本信息恢復
1 Status VersionSet::Recover() { 2 ...... 3 // Read "CURRENT" file, which contains a pointer to the current manifest file 4 std::string current; 5 Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t); 6 ...... 7 current.resize(current.size() - 1); 8 9 std::string dscname = dbname_ + "/" + current; 10 SequentialFile* file; 11 s = env_->NewSequentialFile(dscname, &file); //manifest文件 12 ...... 13 14 bool have_log_number = false; 15 bool have_prev_log_number = false; 16 bool have_next_file = false; 17 bool have_last_sequence = false; 18 uint64_t next_file = 0; 19 uint64_t last_sequence = 0; 20 uint64_t log_number = 0; 21 uint64_t prev_log_number = 0; 22 VersionSet::Builder builder(this, current_); 23 24 { 25 LogReporter reporter; 26 reporter.status = &s; 27 log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); 28 Slice record; 29 std::string scratch; 30 while (reader.ReadRecord(&record, &scratch) && s.ok()) { //依次讀取manifest中的VersionEdit信息,構建VersionSet 31 VersionEdit edit; 32 s = edit.DecodeFrom(record); 33 if (s.ok()) { //Comparator不一致時,返回錯誤信息 34 if (edit.has_comparator_ && 35 edit.comparator_ != icmp_.user_comparator()->Name()) { 36 s = Status::InvalidArgument( 37 edit.comparator_ + "does not match existing comparator ", 38 icmp_.user_comparator()->Name()); 39 //實際上,這裡可以直接break 40 } 41 } 42 43 if (s.ok()) { 44 builder.Apply(&edit); //構建當前Version 45 } 46 47 if (edit.has_log_number_) { 48 log_number = edit.log_number_; 49 have_log_number = true; 50 } 51 52 if (edit.has_prev_log_number_) { 53 prev_log_number = edit.prev_log_number_; 54 have_prev_log_number = true; 55 } 56 57 if (edit.has_next_file_number_) { 58 next_file = edit.next_file_number_; 59 have_next_file = true; 60 } 61 62 if (edit.has_last_sequence_) { 63 last_sequence = edit.last_sequence_; 64 have_last_sequence = true; 65 } 66 } 67 } 68 delete file; 69 file = NULL; 70 71 ...... 89 90 if (s.ok()) { 91 Version* v = new Version(this); 92 builder.SaveTo(v); 93 // Install recovered version 94 Finalize(v); //計算下次執行壓縮的Level 95 AppendVersion(v); 96 manifest_file_number_ = next_file; 97 next_file_number_ = next_file + 1; 98 last_sequence_ = last_sequence; 99 log_number_ = log_number; 100 prev_log_number_ = prev_log_number; 101 } 102 103 return s; 104 } 105
Recover通過Manifest恢復VersionSet及Current Version信息,恢復完畢後Alive的Version列表中僅包含當Current Version對象。
2. Compaction時:Compaction(壓縮)應該是LevelDB中最為復雜的功能,它需要Version類組的深度介入。來看VersionSet中所有和Compaction相關的接口聲明:
1 // Apply *edit to the current version to form a new descriptor that 2 // is both saved to persistent state and installed as the new 3 // current version. Will release *mu while actually writing to the file. 4 // REQUIRES: *mu is held on entry. 5 // REQUIRES: no other thread concurrently calls LogAndApply() 6 Status LogAndApply(VersionEdit* edit, port::Mutex* mu); 7 8 // Pick level and inputs for a new compaction. 9 // Returns NULL if there is no compaction to be done. 10 // Otherwise returns a pointer to a heap-allocated object that 11 // describes the compaction. Caller should delete the result. 12 Compaction* PickCompaction(); 13 14 // Return a compaction object for compacting the range [begin,end] in 15 // the specified level. Returns NULL if there is nothing in that 16 // level that overlaps the specified range. Caller should delete 17 // the result. 18 Compaction* CompactRange( 19 int level, 20 const InternalKey* begin, 21 const InternalKey* end); 22 23 // Create an iterator that reads over the compaction inputs for "*c". 24 // The caller should delete the iterator when no longer needed. 25 Iterator* MakeInputIterator(Compaction* c); 26 27 // Returns true iff some level needs a compaction. 28 bool NeedsCompaction() const { 29 Version* v = current_; 30 return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); 31 } 32 33 // Add all files listed in any live version to *live. 34 // May also mutate some internal state. 35 void AddLiveFiles(std::set<uint64_t>* live);
數據庫的讀、寫操作都可能觸發Compaction,通過調用NeedCompaction判定是否需要執行Compaction,如需Compaction則調用PickCompaction獲取Compactoin信息。
其他幾個方法也和Compaction操作相關,其中LogAndApply非常重要,它將VersionEdit應用於Current Version、VersoinEdit持久化到Manifest文件、將新的Version做為Current Version。
1 Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { 2 if (edit->has_log_number_) { 3 assert(edit->log_number_ >= log_number_); 4 assert(edit->log_number_ < next_file_number_); 5 } 6 else { 7 edit->SetLogNumber(log_number_); 8 } 9 10 if (!edit->has_prev_log_number_) { 11 edit->SetPrevLogNumber(prev_log_number_); 12 } 13 14 edit->SetNextFile(next_file_number_); 15 edit->SetLastSequence(last_sequence_); 16 17 //1. New Version = Current Version + VersionEdit 18 Version* v = new Version(this); 19 { 20 Builder builder(this, current_); 21 builder.Apply(edit); 22 builder.SaveTo(v); 23 } 24 //2. 重新計算Compaction Level\Compaction Score 25 Finalize(v); 26 27 //3. 打開數據庫時,創建新的Manifest並保存當前版本信息 28 // Initialize new descriptor log file if necessary by creating 29 // a temporary file that contains a snapshot of the current version. 30 std::string new_manifest_file; 31 Status s; 32 if (descriptor_log_ == NULL) { 33 // No reason to unlock *mu here since we only hit this path in the 34 // first call to LogAndApply (when opening the database). 35 assert(descriptor_file_ == NULL); 36 new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); 37 edit->SetNextFile(next_file_number_); 38 s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); 39 if (s.ok()) { 40 descriptor_log_ = new log::Writer(descriptor_file_); 41 s = WriteSnapshot(descriptor_log_); //當前版本信息 42 } 43 } 44 45 //4. 保存增量信息,即VersionEdit信息 46 // Unlock during expensive MANIFEST log write 47 { 48 mu->Unlock(); 49 50 // Write new record to MANIFEST log 51 if (s.ok()) { 52 std::string record; 53 edit->EncodeTo(&record); 54 s = descriptor_log_->AddRecord(record); 55 if (s.ok()) { 56 s = descriptor_file_->Sync(); 57 } 58 } 59 60 // If we just created a new descriptor file, install it by writing a 61 // new CURRENT file that points to it. 62 if (s.ok() && !new_manifest_file.empty()) { 63 s = SetCurrentFile(env_, dbname_, manifest_file_number_); 64 } 65 66 mu->Lock(); 67 } 68 69 //5. 將新的版本添加到Alive版本列表,並將其做為Current Version 70 // Install the new version 71 if (s.ok()) { 72 AppendVersion(v); 73 log_number_ = edit->log_number_; 74 prev_log_number_ = edit->prev_log_number_; 75 } 76 else { 77 delete v; 78 if (!new_manifest_file.empty()) { 79 delete descriptor_log_; 80 delete descriptor_file_; 81 descriptor_log_ = NULL; 82 descriptor_file_ = NULL; 83 env_->DeleteFile(new_manifest_file); 84 } 85 } 86 87 return s; 88 }
3. 讀取數據時:LevelDB通過VersionSet中的TableCache對象完成數據讀取。
TableCache是SSTable的緩存類,NewIterator方法通過傳入指定的文件編號返回該文件的Iterator供外部使用。
1 class TableCache { 2 public: 3 TableCache(const std::string& dbname, const Options* options, int entries); 4 ~TableCache(); 5 6 // Return an iterator for the specified file number (the corresponding 7 // file length must be exactly "file_size" bytes). If "tableptr" is 8 // non-NULL, also sets "*tableptr" to point to the Table object 9 // underlying the returned iterator, or NULL if no Table object underlies 10 // the returned iterator. The returned "*tableptr" object is owned by 11 // the cache and should not be deleted, and is valid for as long as the 12 // returned iterator is live. 13 Iterator* NewIterator(const ReadOptions& options, 14 uint64_t file_number, 15 uint64_t file_size, 16 Table** tableptr = NULL); 17 18 // Evict any entry for the specified file number 19 void Evict(uint64_t file_number); 20 21 private: 22 Env* const env_; 23 const std::string dbname_; 24 const Options* options_; 25 Cache* cache_; 26 };
緩存機制主要通過Cache對象實現,關於Cache的備忘下節會講。
Version
Version維護了一份當前版本的SSTable的元數據,其對外暴露的接口大部分也和元數據相關:
1 void GetOverlappingInputs( 2 int level, 3 const InternalKey* begin, // NULL means before all keys 4 const InternalKey* end, // NULL means after all keys 5 std::vector<FileMetaData*>* inputs); 6 7 // Returns true iff some file in the specified level overlaps 8 // some part of [*smallest_user_key,*largest_user_key]. 9 // smallest_user_key==NULL represents a key smaller than all keys in the DB. 10 // largest_user_key==NULL represents a key largest than all keys in the DB. 11 bool OverlapInLevel(int level, 12 const Slice* smallest_user_key, 13 const Slice* largest_user_key); 14 15 // Return the level at which we should place a new memtable compaction 16 // result that covers the range [smallest_user_key,largest_user_key]. 17 int PickLevelForMemTableOutput(const Slice& smallest_user_key, 18 const Slice& largest_user_key); 19 20 int NumFiles(int level) const { return files_[level].size(); }
還有兩個數據庫讀取操作相關的方法Get、UpdateStats,來看Get:
1 Status Version::Get(const ReadOptions& options, 2 const LookupKey& k, 3 std::string* value, 4 GetStats* stats) 5 { 6 Slice ikey = k.internal_key(); 7 Slice user_key = k.user_key(); 8 const Comparator* ucmp = vset_->icmp_.user_comparator(); 9 Status s; 10 11 stats->seek_file = NULL; 12 stats->seek_file_level = -1; 13 FileMetaData* last_file_read = NULL; 14 int last_file_read_level = -1; 15 16 // We can search level-by-level since entries never hop across 17 // levels. Therefore we are guaranteed that if we find data 18 // in an smaller level, later levels are irrelevant. 19 std::vector<FileMetaData*> tmp; 20 FileMetaData* tmp2; 21 22 //1. 查找包含指定Key的所有文件 23 for (int level = 0; level < config::kNumLevels; level++) { 24 size_t num_files = files_[level].size(); 25 if (num_files == 0) continue; 26 27 // Get the list of files to search in this level 28 FileMetaData* const* files = &files_[level][0]; 29 if (level == 0) { //1.1 Level-0可能存在多個文件均包含該Key 30 // Level-0 files may overlap each other. Find all files that 31 // overlap user_key and process them in order from newest to oldest. 32 tmp.reserve(num_files); 33 for (uint32_t i = 0; i < num_files; i++) { 34 FileMetaData* f = files[i]; 35 if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && 36 ucmp->Compare(user_key, f->largest.user_key()) <= 0) { 37 tmp.push_back(f); 38 } 39 } 40 if (tmp.empty()) continue; 41 42 std::sort(tmp.begin(), tmp.end(), NewestFirst); //將文件按更新順序排列 43 files = &tmp[0]; 44 num_files = tmp.size(); 45 } 46 else { //1.2 Level-0之上,一個Key只可能存在於一個文件中 47 // Binary search to find earliest index whose largest key >= ikey. 48 uint32_t index = FindFile(vset_->icmp_, files_[level], ikey); 49 if (index >= num_files) { 50 files = NULL; 51 num_files = 0; 52 } 53 else { 54 tmp2 = files[index]; 55 if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) { 56 // All of "tmp2" is past any data for user_key 57 files = NULL; 58 num_files = 0; 59 } 60 else { 61 files = &tmp2; 62 num_files = 1; 63 } 64 } 65 } 66 67 //2. 遍歷所有文件,查找Key值數據。 68 for (uint32_t i = 0; i < num_files; ++i) { 69 if (last_file_read != NULL && stats->seek_file == NULL) { 70 // We have had more than one seek for this read. Charge the 1st file. 71 stats->seek_file = last_file_read; 72 stats->seek_file_level = last_file_read_level; 73 } 74 75 FileMetaData* f = files[i]; 76 last_file_read = f; 77 last_file_read_level = level; 78 79 //2.1 SSTable迭代器 80 Iterator* iter = vset_->table_cache_->NewIterator( 81 options, 82 f->number, 83 f->file_size); 84 iter->Seek(ikey); //2.2 查找指定Key 85 const bool done = GetValue(iter, user_key, value, &s); //2.3 Get Value 86 if (!iter->status().ok()) { 87 s = iter->status(); 88 delete iter; 89 return s; 90 } 91 else { 92 delete iter; 93 if (done) { 94 return s; 95 } 96 } 97 } 98 } 99 100 return Status::NotFound(Slice()); // Use an empty error message for speed 101 }
VersionEdit
版本建變化除運行期編號修改外,最主要的是SSTable文件的增刪信息。當Compaction執行時,必然會出現部分SSTable無效被移除,合並生成的新SSTable被加入到數據庫中。VersionEdit提供AddFile、DeleteFile完成變更標識。
VersionEdit提供的另外一個主要功能接口聲明如下:
1 void EncodeTo(std::string* dst) const; 2 Status DecodeFrom(const Slice& src);
這是一組序列化、反序列化方法,序列化文件為Manifest文件。
1 void VersionEdit::EncodeTo(std::string* dst) const { 2 //1. 序列化比較器 3 if (has_comparator_) { 4 PutVarint32(dst, kComparator); 5 PutLengthPrefixedSlice(dst, comparator_); 6 } 7 //2. 序列化運行期編號信息 8 if (has_log_number_) { 9 PutVarint32(dst, kLogNumber); 10 PutVarint64(dst, log_number_); 11 } 12 if (has_prev_log_number_) { 13 PutVarint32(dst, kPrevLogNumber); 14 PutVarint64(dst, prev_log_number_); 15 } 16 if (has_next_file_number_) { 17 PutVarint32(dst, kNextFileNumber); 18 PutVarint64(dst, next_file_number_); 19 } 20 if (has_last_sequence_) { 21 PutVarint32(dst, kLastSequence); 22 PutVarint64(dst, last_sequence_); 23 } 24 //3. 序列化Compact Pointer 25 for (size_t i = 0; i < compact_pointers_.size(); i++) { 26 PutVarint32(dst, kCompactPointer); 27 PutVarint32(dst, compact_pointers_[i].first); // level 28 PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode()); 29 } 30 31 //4. 序列化本次版本變化的SSTable文件列表 32 for (DeletedFileSet::const_iterator iter = deleted_files_.begin(); 33 iter != deleted_files_.end(); 34 ++iter) { 35 PutVarint32(dst, kDeletedFile); 36 PutVarint32(dst, iter->first); // level 37 PutVarint64(dst, iter->second); // file number 38 } 39 40 for (size_t i = 0; i < new_files_.size(); i++) { 41 const FileMetaData& f = new_files_[i].second; 42 PutVarint32(dst, kNewFile); 43 PutVarint32(dst, new_files_[i].first); // level 44 PutVarint64(dst, f.number); 45 PutVarint64(dst, f.file_size); 46 PutLengthPrefixedSlice(dst, f.smallest.Encode()); 47 PutLengthPrefixedSlice(dst, f.largest.Encode()); 48 } 49 }
回到最開始的問題:版本信息由什麼用?
每個LevelDB有一個Current File,Current File內唯一的信息為:當前數據庫的Manifest文件名。Manifest中包含了上次運行後全部的版本信息,LevelDB通過Manifest文件恢復版本信息。
LevelDB的版本信息為富語義功能組,它所包含的信息已經大大超出了版本定義本身。如果將Version類封裝為結構體、VersionSet僅僅為Version列表、VersionEdit也是單純的結構數據,再為上述結構提供多套功能類應該更為合理。目前來看,這應當算作LevelDB實現的一處臭味。