【4.分佈式存儲】-leveldb-源碼-compact/sst/seek/version

mem到table

用到了內存集合爲塊4k。file中寫先寫64Kbuf再write、一個文件寫完調fsync
immem=>table
迭代器。循環。
計算當前,一塊一次flush(加入到fd的buffer),一塊加一次index block,
這裏簡單的每一個block一次write。append還作了留64kbuf。在write
一個table一次fsync.數據結構

BuildTableapp

1.add節點。
for (; iter->Valid(); iter->Next()) {
  Slice key = iter->key();
  meta->largest.DecodeFrom(key);
  builder->Add(key, iter->value());
}

2.加一些彙總meta block等
s = builder->Finish();
if (s.ok()) {
  meta->file_size = builder->FileSize();
  assert(meta->file_size > 0);
}
delete builder;

3.落盤
if (s.ok()) {
  s = file->Sync();  //封裝fsync
}
if (s.ok()) {
  s = file->Close();
}

針對1add
數據結構要算出各部分偏移量,維持4k。一塊加一個索引
每4k調一次Flush(本身實現)
Flush維護64K的buf。滿了調用write。不然只是放在buf中

add:
if (r->pending_index_entry) {
    r->index_block.Add(r->last_key, Slice(handle_encoding));
    r->pending_index_entry = false;
  }
 
  r->data_block.Add(key, value);  //r結構中改

  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }

Flush:
WriteBlock(&r->data_block, &r->pending_handle);   //r->file->Append(block_contents);
if (ok()) {
    r->pending_index_entry = true;
    r->status = r->file->Flush();   //調動WriteUnbuffered真正write
}


這裏每64k調用一次write。不然是fd的buf
Status Append(const Slice& data) override {
    size_t write_size = data.size();
    const char* write_data = data.data();

    // Fit as much as possible into buffer.
    size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_);
    std::memcpy(buf_ + pos_, write_data, copy_size);
    write_data += copy_size;
    write_size -= copy_size;
    pos_ += copy_size;
    if (write_size == 0) {
      return Status::OK();
    }

    // Can't fit in buffer, so need to do at least one write.
    Status status = FlushBuffer();
    if (!status.ok()) {
      return status;
    }

    // Small writes go to buffer, large writes are written directly.
    if (write_size < kWritableFileBufferSize) {  //64K
      std::memcpy(buf_, write_data, write_size);
      pos_ = write_size;
      return Status::OK();
    }
    return WriteUnbuffered(write_data, write_size);
  }

log也用的這個。若Block已經小於header,填充0下一個Block。不然寫入block剩餘和left小的,加入first等標識。一段調一次加入頭信息/crc32c校驗和和數據,走append和flush。ide

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();
  size_t left = slice.size();

  // Fragment the record if necessary and emit it.  Note that if slice
  // is empty, we still want to iterate once to emit a single
  // zero-length record
  Status s;
  bool begin = true;
  do {
    const int leftover = kBlockSize - block_offset_;
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {
      // Switch to a new block
      if (leftover > 0) {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        static_assert(kHeaderSize == 7, "");
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ = 0;
    }

    // Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    const bool end = (left == fragment_length);
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }

    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;
    left -= fragment_length;
    begin = false;
  } while (s.ok() && left > 0);
  return s;
}

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
                                  size_t length) {
  assert(length <= 0xffff);  // Must fit in two bytes
  assert(block_offset_ + kHeaderSize + length <= kBlockSize);

  // Format the header
  char buf[kHeaderSize];
  buf[4] = static_cast<char>(length & 0xff);
  buf[5] = static_cast<char>(length >> 8);
  buf[6] = static_cast<char>(t);

  // Compute the crc of the record type and the payload.
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
  crc = crc32c::Mask(crc);  // Adjust for storage
  EncodeFixed32(buf, crc);

  // Write the header and the payload
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    s = dest_->Append(Slice(ptr, length));
    if (s.ok()) {
      s = dest_->Flush();
    }
  }
  block_offset_ += kHeaderSize + length;
  return s;
}

seek

在get時更新seek的方案,從sst找而且沒有找到的,第一個文件的是seek_file,在update_state時將其allow_seek--。當0時,file_to_compact_設爲該sst。在調度的時候優先比較size(0層個數,其餘層大於限制)再選seek
一個sst被訪問了太屢次有多是每次get都會查找這個sst,可是又沒有找到,那不如把這個sst合併到下一層,這樣下次就不用作無用的查找了
seek值的選取:函數

// We arrange to automatically compact this file after
      // a certain number of seeks.  Let's assume:
      //   (1) One seek costs 10ms
      //   (2) Writing or reading 1MB costs 10ms (100MB/s)
      //   (3) A compaction of 1MB does 25MB of IO:
      //         1MB read from this level
      //         10-12MB read from next level (boundaries may be misaligned)
      //         10-12MB written to next level
      // This implies that 25 seeks cost the same as the compaction
      // of 1MB of data.  I.e., one seek costs approximately the
      // same as the compaction of 40KB of data.  We are a little
      // conservative and allow approximately one seek for every 16KB
      // of data before triggering a compaction.
      // 在這裏更新allowed_seeks, 主要用於seek_compaction    
      f->allowed_seeks = (f->file_size / 16384);
      if (f->allowed_seeks < 100) f->allowed_seeks = 100;

compact

void PosixEnv::Schedule(
    void (*background_work_function)(void* background_work_arg),
    void* background_work_arg) {
  background_work_mutex_.Lock();

  // Start the background thread, if we haven't done so already.
  if (!started_background_thread_) {
    started_background_thread_ = true;
    std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
    background_thread.detach();
  }

  // If the queue is empty, the background thread may be waiting for work.
  if (background_work_queue_.empty()) {
    background_work_cv_.Signal();
  }

  background_work_queue_.emplace(background_work_function, background_work_arg);
  background_work_mutex_.Unlock();
}

void PosixEnv::BackgroundThreadMain() {
  while (true) {
    background_work_mutex_.Lock();

    // Wait until there is work to be done.
    while (background_work_queue_.empty()) {
      background_work_cv_.Wait();
    }

    assert(!background_work_queue_.empty());
    auto background_work_function = background_work_queue_.front().function;
    void* background_work_arg = background_work_queue_.front().arg;
    background_work_queue_.pop();

    background_work_mutex_.Unlock();
    background_work_function(background_work_arg);
  }
}

調用:
env_->Schedule(&DBImpl::BGWork, this);
void DBImpl::BGWork(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}

void DBImpl::BackgroundCall() {
  MutexLock l(&mutex_);
  assert(background_compaction_scheduled_);
  if (shutting_down_.load(std::memory_order_acquire)) {
    // No more background work when shutting down.
  } else if (!bg_error_.ok()) {
    // No more background work after a background error.
  } else {
    BackgroundCompaction();
  }

  background_compaction_scheduled_ = false;

  // Previous compaction may have produced too many files in a level,
  // so reschedule another compaction if needed.
  MaybeScheduleCompaction();
  background_work_finished_signal_.SignalAll();
}

壓縮過程:
input是選擇出的全部文件的it集合。文件NewTwoLevelIterator或0層的table_cache_->NewIterator
input->SeekToFirst();
每一個input  builder->add 若超出大小就直接生成文件

compact->outfile->Sync()
調用LogAndApply生成新的version。寫入manifest中

version

version
提供了在當前版本搜索鍵值的Get方法,其次是爲上層調用提供了收集當前版本全部文件的迭代器,最後是爲合併文件提供了判斷鍵值範圍與文件是否有交集的輔助函數ui

Version_set這個類不僅是簡單的Version集合,還操做着和版本變化的一些函數,例如將version_edit應用到新的版本,將新版本設爲當前版本等等。this

version_edit這個類主要是兩個版本之間的差量,version_edit序列化以後,會保存在manifest文件中日誌

每當調用LogAndApply的時候,都會將VersionEdit做爲一筆記錄,追加寫入到MANIFEST文件。而且生成新version加入到版本鏈表。
將version_set內的文件內的文件編號保存進edit;
新建一個Version,而後調用Builder->apply和Builder->SaveTo方法將edit應用到新版本中.
將edit寫進manifest文件中,並更新Current文件,指向最新manifest.
將新版本添加到版本鏈表中,並設置爲當前鏈表.
Manifest:
使用的coparator名、log編號、前一個log編號、下一個文件編號、上一個序列號。這些都是日誌、sstable文件使用到的重要信息,這些字段不必定必然存在。
其次是compact點,可能有多個,寫入格式爲{kCompactPointer, level, internal key}。
其後是刪除文件,可能有多個,格式爲{kDeletedFile, level, file number}。
最後是新文件,可能有多個,格式爲
{kNewFile, level, file number, file size, min key, max key}。
對於版本間變更它是新加的文件集合,對於MANIFEST快照是該版本包含的全部sstable文件集合。
http://luodw.cc/2015/10/31/le...code

相關文章
相關標籤/搜索