leveldb 學習記錄(四)Log文件

前文記錄html

leveldb 學習記錄(一) skiplist
leveldb 學習記錄(二) Slice
leveldb 學習記錄(三) MemTable 與 Immutable Memtable
leveldb 學習記錄(四) skiplist補完數據庫

 

KV數據庫中 大部分是採用內存存儲,若是中途發生意外狀況,沒有dump到磁盤的記錄就可能會丟失,可是若是採用log記錄操做即可以按照log記錄進行這部分的數據恢復數據結構

因此,咱們在每次操做kv記錄的時候都須要將操做記錄到log文件中。app

每一個日誌文件都會切分爲32KB的BLOCK,BLOCK來記錄那些操做RECORD,可是不保證RECORD長度固定。因此有了如下設計函數

record :=      checksum: uint32          // crc32c of type and data[]
                  length: uint16
                  type: uint8                           // One of FULL, FIRST, MIDDLE, LAST
                  data: uint8[length]      post

同時也不保證RECORD不跨BLOCK記錄學習

因此RECORD的類型有 FULL, FIRST, MIDDLE, LAST四種類型ui

當一個RECORD在一個BLOCK內 那麼它的類型是FULLthis

不然跨BLOCK記錄RECORD的時候 記錄能夠分爲FIRST, MIDDLE, LASTurl

如圖

上圖能夠看到LOG文件由三個BLOCK組成BLOCK1 BLOCK2 BLOCK3

不一樣的RECORD 分配以下 

BLOCK1  RECORDA整個數據都在BLOCK1中,因此他的類型是FULL 。接着是 RECORDB的部分數據 類型爲FIRST

BLOCK2  RECORDB的數據, 因爲部分數據在BLOCK1和BLOCK3中,因此這部分RECORDB的類型是MIDDLE

BLOCK3  首先是RECORDB的數據,類型是LAST。 緊接着是RECORDC,這部分數據類型爲FULL

 

record分爲校驗和,長度,類型和數據。

對應的相關LOG 數據結構以下

 1 enum RecordType {
 2   // Zero is reserved for preallocated files
 3   kZeroType = 0,
 4 
 5   kFullType = 1,
 6 
 7   // For fragments
 8   kFirstType = 2,
 9   kMiddleType = 3,
10   kLastType = 4
11 };
12 static const int kMaxRecordType = kLastType;
13 
14 static const int kBlockSize = 32768;
15 
16 // Header is checksum (4 bytes), type (1 byte), length (2 bytes).
17 static const int kHeaderSize = 4 + 1 + 2;
18 
19 }
20 }

 

 

寫日誌類Writer:

頭文件

 1 class Writer {
 2  public:
 3   // Create a writer that will append data to "*dest".
 4   // "*dest" must be initially empty.
 5   // "*dest" must remain live while this Writer is in use.
 6   explicit Writer(WritableFile* dest);
 7   ~Writer();
 8 
 9   Status AddRecord(const Slice& slice);
10 
11  private:
12   WritableFile* dest_;
13   int block_offset_;       // Current offset in block
14 
15   // crc32c values for all supported record types.  These are
16   // pre-computed to reduce the overhead of computing the crc of the
17   // record type stored in the header.
18   uint32_t type_crc_[kMaxRecordType + 1];
19 
20   Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);
21 
22   // No copying allowed
23   Writer(const Writer&);
24   void operator=(const Writer&);
25 };

除開構造函數,主要來看看AddRecord和EmitPhysicalRecord函數

 

 1 Status Writer::AddRecord(const Slice& slice) {
 2   const char* ptr = slice.data();
 3   size_t left = slice.size();
 4 
 5   // Fragment the record if necessary and emit it.  Note that if slice
 6   // is empty, we still want to iterate once to emit a single
 7   // zero-length record
 8   Status s;
 9   bool begin = true;
10   do {
11     const int leftover = kBlockSize - block_offset_;        //剩餘要填充的數據長度 是一個BLOCK的長度減去塊內已經填充的長度
12     assert(leftover >= 0);
13     if (leftover < kHeaderSize) {                            //要填充的長度大於7 則在下一個BLOCK進行記錄 (由於checksum 4字節  length2字節  type 1字節,光是記錄信息已經須要7個字節)
14       // Switch to a new block
15       if (leftover > 0) {
16         // Fill the trailer (literal below relies on kHeaderSize being 7)
17         assert(kHeaderSize == 7);
18         dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
19       }
20       block_offset_ = 0;
21     }
22 
23     // Invariant: we never leave < kHeaderSize bytes in a block.
24     assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
25 
26     const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
27     const size_t fragment_length = (left < avail) ? left : avail;        //根據可否在本BLOCK填充完畢 選擇填充長度爲left 或者 avail
28 
29     RecordType type;
30     const bool end = (left == fragment_length);
31     if (begin && end) {                                //beg end在用一個BLOCK裏 record的type確定是FULL
32       type = kFullType;    
33     } else if (begin) {                                //本BLOCK只有beg 那麼record的type    就是FIRST
34       type = kFirstType;
35     } else if (end) {                                //本BLOCK只有end 那麼record的TYPE就是last
36       type = kLastType;
37     } else {
38       type = kMiddleType;                            //本BLOCK 沒有beg end  那麼record填充了整個BLOCK type是MIDDLE
39     }
40 
41     s = EmitPhysicalRecord(type, ptr, fragment_length);        //提交到log文件記錄
42     ptr += fragment_length;
43     left -= fragment_length;
44     begin = false;
45   } while (s.ok() && left > 0);
46   return s;
47 }
48 
49 Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
50   assert(n <= 0xffff);  // Must fit in two bytes
51   assert(block_offset_ + kHeaderSize + n <= kBlockSize);
52 
53   // Format the header
54   char buf[kHeaderSize];
55   buf[4] = static_cast<char>(n & 0xff);                //長度低8位
56   buf[5] = static_cast<char>(n >> 8);                //長度高8位
57   buf[6] = static_cast<char>(t);                    //type
58 
59   // Compute the crc of the record type and the payload.
60   uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);                //校驗和
61   crc = crc32c::Mask(crc);                 // Adjust for storage
62   EncodeFixed32(buf, crc);
63 
64   // Write the header and the payload
65   Status s = dest_->Append(Slice(buf, kHeaderSize));        //數據信息寫入
66   if (s.ok()) {
67     s = dest_->Append(Slice(ptr, n));                        //數據寫入
68     if (s.ok()) {
69       s = dest_->Flush();
70     }
71   }
72   block_offset_ += kHeaderSize + n;
73   return s;
74 }

 

//========================================================

讀日誌類Reader:

日誌讀取代碼中還有一個Reporter 類用於報告錯誤

 

 

 1 class Reader {
 2  public:
 3   // Interface for reporting errors.
 4   class Reporter {
 5    public:
 6     virtual ~Reporter();
 7 
 8     // Some corruption was detected.  "size" is the approximate number
 9     // of bytes dropped due to the corruption.
10     virtual void Corruption(size_t bytes, const Status& status) = 0;
11   };
12 
13   // Create a reader that will return log records from "*file".
14   // "*file" must remain live while this Reader is in use.
15   //
16   // If "reporter" is non-NULL, it is notified whenever some data is
17   // dropped due to a detected corruption.  "*reporter" must remain
18   // live while this Reader is in use.
19   //
20   // If "checksum" is true, verify checksums if available.
21   //
22   // The Reader will start reading at the first record located at physical
23   // position >= initial_offset within the file.
24   Reader(SequentialFile* file, Reporter* reporter, bool checksum,
25          uint64_t initial_offset);
26 
27   ~Reader();
28 
29   // Read the next record into *record.  Returns true if read
30   // successfully, false if we hit end of the input.  May use
31   // "*scratch" as temporary storage.  The contents filled in *record
32   // will only be valid until the next mutating operation on this
33   // reader or the next mutation to *scratch.
34   bool ReadRecord(Slice* record, std::string* scratch);
35 
36   // Returns the physical offset of the last record returned by ReadRecord.
37   //
38   // Undefined before the first call to ReadRecord.
39   uint64_t LastRecordOffset();
40 
41  private:
42   SequentialFile* const file_;
43   Reporter* const reporter_;
44   bool const checksum_;
45   char* const backing_store_;
46   Slice buffer_;
47   bool eof_;   // Last Read() indicated EOF by returning < kBlockSize
48 
49   // Offset of the last record returned by ReadRecord.
50   uint64_t last_record_offset_;
51   // Offset of the first location past the end of buffer_.
52   uint64_t end_of_buffer_offset_;
53 
54   // Offset at which to start looking for the first record to return
55   uint64_t const initial_offset_;
56 
57   // Extend record types with the following special values
58   enum {
59     kEof = kMaxRecordType + 1,
60     // Returned whenever we find an invalid physical record.
61     // Currently there are three situations in which this happens:
62     // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
63     // * The record is a 0-length record (No drop is reported)
64     // * The record is below constructor's initial_offset (No drop is reported)
65     kBadRecord = kMaxRecordType + 2
66   };
67 
68   // Skips all blocks that are completely before "initial_offset_".
69   //
70   // Returns true on success. Handles reporting.
71   bool SkipToInitialBlock();
72 
73   // Return type, or one of the preceding special values
74   unsigned int ReadPhysicalRecord(Slice* result);
75 
76   // Reports dropped bytes to the reporter.
77   // buffer_ must be updated to remove the dropped bytes prior to invocation.
78   void ReportCorruption(size_t bytes, const char* reason);
79   void ReportDrop(size_t bytes, const Status& reason);
80 
81   // No copying allowed
82   Reader(const Reader&);
83   void operator=(const Reader&);
84 };

 

關鍵函數是bool Reader::ReadRecord(Slice* record, std::string* scratch) 

個人理解中 只要除開徹底被 initial_offset_長度覆蓋的BLOCK ,

剩下的BLOCK依次讀取記錄,根據type是FULL MIDDLE FIRST LAST 決定是否繼續讀取便可

可是源碼中的例外情形太多,看的不是太明白,這個留待實際操做在深刻研究吧

bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  if (last_record_offset_ < initial_offset_) {                    //實際上整個工程中initial_offset_一直爲0  ,
    if (!SkipToInitialBlock()) {                                //block_start_location圓整爲包含initial_offset_的BLOCK的偏移
      return false;
    }
  }

  scratch->clear();
  record->clear();
  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) {
    uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
    const unsigned int record_type = ReadPhysicalRecord(&fragment);
    switch (record_type) {
      case kFullType:                                //一次性讀取FULL類型的record 直接返回成功
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(1)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->clear();
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;

      case kFirstType:                        //讀取到FIRST類型的record  string.assign  而後繼續
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(2)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;
        break;

      case kMiddleType:                            //讀取到MIDDLE類型的record  string.append  而後繼續
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(1)");
        } else {
          scratch->append(fragment.data(), fragment.size());
        }
        break;

      case kLastType:                            //讀取到LAST 類型record string.append
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(2)");
        } else {
          scratch->append(fragment.data(), fragment.size());
          *record = Slice(*scratch);
          last_record_offset_ = prospective_record_offset;
          return true;
        }
        break;

      case kEof:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "partial record without end(3)");
          scratch->clear();
        }
        return false;

      case kBadRecord:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;
          scratch->clear();
        }
        break;

      default: {
        char buf[40];
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
        ReportCorruption(
            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
            buf);
        in_fragmented_record = false;
        scratch->clear();
        break;
      }
    }
  }
  return false;
}

 

 

 

 

 

 

 

 

 

 

參考:

https://blog.csdn.net/tankles/article/details/7663873

相關文章
相關標籤/搜索