前文記錄html
leveldb 學習記錄(一) skiplist
leveldb 學習記錄(二) Slice
leveldb 學習記錄(三) MemTable 與 Immutable Memtable
leveldb 學習記錄(四) skiplist補完數據庫
KV數據庫中 大部分是採用內存存儲,若是中途發生意外狀況,沒有dump到磁盤的記錄就可能會丟失,可是若是採用log記錄操做即可以按照log記錄進行這部分的數據恢復數據結構
因此,咱們在每次操做kv記錄的時候都須要將操做記錄到log文件中。app
每一個日誌文件都會切分爲32KB的BLOCK,BLOCK來記錄那些操做RECORD,可是不保證RECORD長度固定。因此有了如下設計函數
record := checksum: uint32 // crc32c of type and data[]
length: uint16
type: uint8 // One of FULL, FIRST, MIDDLE, LAST
data: uint8[length] post
同時也不保證RECORD不跨BLOCK記錄學習
因此RECORD的類型有 FULL, FIRST, MIDDLE, LAST四種類型ui
當一個RECORD在一個BLOCK內 那麼它的類型是FULLthis
不然跨BLOCK記錄RECORD的時候 記錄能夠分爲FIRST, MIDDLE, LASTurl
如圖
上圖能夠看到LOG文件由三個BLOCK組成BLOCK1 BLOCK2 BLOCK3
不一樣的RECORD 分配以下
BLOCK1 RECORDA整個數據都在BLOCK1中,因此他的類型是FULL 。接着是 RECORDB的部分數據 類型爲FIRST
BLOCK2 RECORDB的數據, 因爲部分數據在BLOCK1和BLOCK3中,因此這部分RECORDB的類型是MIDDLE
BLOCK3 首先是RECORDB的數據,類型是LAST。 緊接着是RECORDC,這部分數據類型爲FULL
record分爲校驗和,長度,類型和數據。
對應的相關LOG 數據結構以下
1 enum RecordType { 2 // Zero is reserved for preallocated files 3 kZeroType = 0, 4 5 kFullType = 1, 6 7 // For fragments 8 kFirstType = 2, 9 kMiddleType = 3, 10 kLastType = 4 11 }; 12 static const int kMaxRecordType = kLastType; 13 14 static const int kBlockSize = 32768; 15 16 // Header is checksum (4 bytes), type (1 byte), length (2 bytes). 17 static const int kHeaderSize = 4 + 1 + 2; 18 19 } 20 }
寫日誌類Writer:
頭文件
1 class Writer { 2 public: 3 // Create a writer that will append data to "*dest". 4 // "*dest" must be initially empty. 5 // "*dest" must remain live while this Writer is in use. 6 explicit Writer(WritableFile* dest); 7 ~Writer(); 8 9 Status AddRecord(const Slice& slice); 10 11 private: 12 WritableFile* dest_; 13 int block_offset_; // Current offset in block 14 15 // crc32c values for all supported record types. These are 16 // pre-computed to reduce the overhead of computing the crc of the 17 // record type stored in the header. 18 uint32_t type_crc_[kMaxRecordType + 1]; 19 20 Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); 21 22 // No copying allowed 23 Writer(const Writer&); 24 void operator=(const Writer&); 25 };
除開構造函數,主要來看看AddRecord和EmitPhysicalRecord函數
1 Status Writer::AddRecord(const Slice& slice) { 2 const char* ptr = slice.data(); 3 size_t left = slice.size(); 4 5 // Fragment the record if necessary and emit it. Note that if slice 6 // is empty, we still want to iterate once to emit a single 7 // zero-length record 8 Status s; 9 bool begin = true; 10 do { 11 const int leftover = kBlockSize - block_offset_; //剩餘要填充的數據長度 是一個BLOCK的長度減去塊內已經填充的長度 12 assert(leftover >= 0); 13 if (leftover < kHeaderSize) { //要填充的長度大於7 則在下一個BLOCK進行記錄 (由於checksum 4字節 length2字節 type 1字節,光是記錄信息已經須要7個字節) 14 // Switch to a new block 15 if (leftover > 0) { 16 // Fill the trailer (literal below relies on kHeaderSize being 7) 17 assert(kHeaderSize == 7); 18 dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); 19 } 20 block_offset_ = 0; 21 } 22 23 // Invariant: we never leave < kHeaderSize bytes in a block. 24 assert(kBlockSize - block_offset_ - kHeaderSize >= 0); 25 26 const size_t avail = kBlockSize - block_offset_ - kHeaderSize; 27 const size_t fragment_length = (left < avail) ? left : avail; //根據可否在本BLOCK填充完畢 選擇填充長度爲left 或者 avail 28 29 RecordType type; 30 const bool end = (left == fragment_length); 31 if (begin && end) { //beg end在用一個BLOCK裏 record的type確定是FULL 32 type = kFullType; 33 } else if (begin) { //本BLOCK只有beg 那麼record的type 就是FIRST 34 type = kFirstType; 35 } else if (end) { //本BLOCK只有end 那麼record的TYPE就是last 36 type = kLastType; 37 } else { 38 type = kMiddleType; //本BLOCK 沒有beg end 那麼record填充了整個BLOCK type是MIDDLE 39 } 40 41 s = EmitPhysicalRecord(type, ptr, fragment_length); //提交到log文件記錄 42 ptr += fragment_length; 43 left -= fragment_length; 44 begin = false; 45 } while (s.ok() && left > 0); 46 return s; 47 } 48 49 Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { 50 assert(n <= 0xffff); // Must fit in two bytes 51 assert(block_offset_ + kHeaderSize + n <= kBlockSize); 52 53 // Format the header 54 char buf[kHeaderSize]; 55 buf[4] = static_cast<char>(n & 0xff); //長度低8位 56 buf[5] = static_cast<char>(n >> 8); //長度高8位 57 buf[6] = static_cast<char>(t); //type 58 59 // Compute the crc of the record type and the payload. 60 uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n); //校驗和 61 crc = crc32c::Mask(crc); // Adjust for storage 62 EncodeFixed32(buf, crc); 63 64 // Write the header and the payload 65 Status s = dest_->Append(Slice(buf, kHeaderSize)); //數據信息寫入 66 if (s.ok()) { 67 s = dest_->Append(Slice(ptr, n)); //數據寫入 68 if (s.ok()) { 69 s = dest_->Flush(); 70 } 71 } 72 block_offset_ += kHeaderSize + n; 73 return s; 74 }
//========================================================
讀日誌類Reader:
日誌讀取代碼中還有一個Reporter 類用於報告錯誤
1 class Reader { 2 public: 3 // Interface for reporting errors. 4 class Reporter { 5 public: 6 virtual ~Reporter(); 7 8 // Some corruption was detected. "size" is the approximate number 9 // of bytes dropped due to the corruption. 10 virtual void Corruption(size_t bytes, const Status& status) = 0; 11 }; 12 13 // Create a reader that will return log records from "*file". 14 // "*file" must remain live while this Reader is in use. 15 // 16 // If "reporter" is non-NULL, it is notified whenever some data is 17 // dropped due to a detected corruption. "*reporter" must remain 18 // live while this Reader is in use. 19 // 20 // If "checksum" is true, verify checksums if available. 21 // 22 // The Reader will start reading at the first record located at physical 23 // position >= initial_offset within the file. 24 Reader(SequentialFile* file, Reporter* reporter, bool checksum, 25 uint64_t initial_offset); 26 27 ~Reader(); 28 29 // Read the next record into *record. Returns true if read 30 // successfully, false if we hit end of the input. May use 31 // "*scratch" as temporary storage. The contents filled in *record 32 // will only be valid until the next mutating operation on this 33 // reader or the next mutation to *scratch. 34 bool ReadRecord(Slice* record, std::string* scratch); 35 36 // Returns the physical offset of the last record returned by ReadRecord. 37 // 38 // Undefined before the first call to ReadRecord. 39 uint64_t LastRecordOffset(); 40 41 private: 42 SequentialFile* const file_; 43 Reporter* const reporter_; 44 bool const checksum_; 45 char* const backing_store_; 46 Slice buffer_; 47 bool eof_; // Last Read() indicated EOF by returning < kBlockSize 48 49 // Offset of the last record returned by ReadRecord. 50 uint64_t last_record_offset_; 51 // Offset of the first location past the end of buffer_. 52 uint64_t end_of_buffer_offset_; 53 54 // Offset at which to start looking for the first record to return 55 uint64_t const initial_offset_; 56 57 // Extend record types with the following special values 58 enum { 59 kEof = kMaxRecordType + 1, 60 // Returned whenever we find an invalid physical record. 61 // Currently there are three situations in which this happens: 62 // * The record has an invalid CRC (ReadPhysicalRecord reports a drop) 63 // * The record is a 0-length record (No drop is reported) 64 // * The record is below constructor's initial_offset (No drop is reported) 65 kBadRecord = kMaxRecordType + 2 66 }; 67 68 // Skips all blocks that are completely before "initial_offset_". 69 // 70 // Returns true on success. Handles reporting. 71 bool SkipToInitialBlock(); 72 73 // Return type, or one of the preceding special values 74 unsigned int ReadPhysicalRecord(Slice* result); 75 76 // Reports dropped bytes to the reporter. 77 // buffer_ must be updated to remove the dropped bytes prior to invocation. 78 void ReportCorruption(size_t bytes, const char* reason); 79 void ReportDrop(size_t bytes, const Status& reason); 80 81 // No copying allowed 82 Reader(const Reader&); 83 void operator=(const Reader&); 84 };
關鍵函數是bool Reader::ReadRecord(Slice* record, std::string* scratch)
個人理解中 只要除開徹底被 initial_offset_長度覆蓋的BLOCK ,
剩下的BLOCK依次讀取記錄,根據type是FULL MIDDLE FIRST LAST 決定是否繼續讀取便可
可是源碼中的例外情形太多,看的不是太明白,這個留待實際操做在深刻研究吧
bool Reader::ReadRecord(Slice* record, std::string* scratch) { if (last_record_offset_ < initial_offset_) { //實際上整個工程中initial_offset_一直爲0 , if (!SkipToInitialBlock()) { //block_start_location圓整爲包含initial_offset_的BLOCK的偏移 return false; } } scratch->clear(); record->clear(); bool in_fragmented_record = false; // Record offset of the logical record that we're reading // 0 is a dummy value to make compilers happy uint64_t prospective_record_offset = 0; Slice fragment; while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); const unsigned int record_type = ReadPhysicalRecord(&fragment); switch (record_type) { case kFullType: //一次性讀取FULL類型的record 直接返回成功 if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (scratch->empty()) { in_fragmented_record = false; } else { ReportCorruption(scratch->size(), "partial record without end(1)"); } } prospective_record_offset = physical_record_offset; scratch->clear(); *record = fragment; last_record_offset_ = prospective_record_offset; return true; case kFirstType: //讀取到FIRST類型的record string.assign 而後繼續 if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (scratch->empty()) { in_fragmented_record = false; } else { ReportCorruption(scratch->size(), "partial record without end(2)"); } } prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; break; case kMiddleType: //讀取到MIDDLE類型的record string.append 而後繼續 if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(1)"); } else { scratch->append(fragment.data(), fragment.size()); } break; case kLastType: //讀取到LAST 類型record string.append if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(2)"); } else { scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); last_record_offset_ = prospective_record_offset; return true; } break; case kEof: if (in_fragmented_record) { ReportCorruption(scratch->size(), "partial record without end(3)"); scratch->clear(); } return false; case kBadRecord: if (in_fragmented_record) { ReportCorruption(scratch->size(), "error in middle of record"); in_fragmented_record = false; scratch->clear(); } break; default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", record_type); ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), buf); in_fragmented_record = false; scratch->clear(); break; } } } return false; }
參考:
https://blog.csdn.net/tankles/article/details/7663873