【RocksDB】TransactionDB源碼分析

摘要: RocksDB版本:v5.13.4 1. 概述 得益於LSM-Tree結構,RocksDB全部的寫入並不是是update in-place,因此他支持起來事務的難度也相對較小,主要原理就是利用WriteBatch將事務全部寫操做在內存緩存打包,而後在commit時一次性將WriteBatch寫入,保證了原子,另外經過Sequence和Key鎖來解決衝突實現隔離。緩存

RocksDB版本:v5.13.4併發

  1. 概述

得益於LSM-Tree結構,RocksDB全部的寫入並不是是update in-place,因此他支持起來事務的難度也相對較小,主要原理就是利用WriteBatch將事務全部寫操做在內存緩存打包,而後在commit時一次性將WriteBatch寫入,保證了原子,另外經過Sequence和Key鎖來解決衝突實現隔離。app

RocksDB的Transaction分爲兩類:Pessimistic和Optimistic,相似悲觀鎖和樂觀鎖的區別,PessimisticTransaction的衝突檢測和加鎖是在事務中每次寫操做以前作的(commit後釋放),若是失敗則該操做失敗;OptimisticTransaction不加鎖,衝突檢測是在commit階段作的,commit時發現衝突則失敗。分佈式

具體使用時須要結合實際場景來選擇,若是併發事務寫入操做的Key重疊度不高,那麼用Optimistic更合適一些(省掉Pessimistic中額外的鎖操做)oop

  1. 用法

介紹實現原理前,先來看一下用法:性能

【1. 基本用法】ui

Options options;
TransactionDBOptions txn_db_options;
options.create_if_missing = true;
TransactionDB* txn_db;

// 打開DB(默認Pessimistic)
Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db);
assert(s.ok());

// 建立一個事務
Transaction* txn = txn_db->BeginTransaction(write_options);
assert(txn);

// 事務txn讀取一個key
s = txn->Get(read_options, "abc", &value);
assert(s.IsNotFound());

// 事務txn寫一個key
s = txn->Put("abc", "def");
assert(s.ok());

// 經過TransactionDB::Get在事務外讀取一個key
s = txn_db->Get(read_options, "abc", &value);

// 經過TrasactionDB::Put在事務外寫一個key
// 這裏並不會有影響,由於寫的不是"abc",不衝突
// 若是是"abc"的話
// 則Put會一直卡住直到超時或等待事務Commit(本例中會超時)
s = txn_db->Put(write_options, "xyz", "zzz");

s = txn->Commit();
assert(s.ok());
// 析構事務
delete txn;
delete txn_db;
經過BeginTransaction打開一個事務,而後調用Put、Get等接口進行事務操做,最後調用Commit進行提交。

【2. 回滾】this

...
// 事務txn寫入abc
s = txn->Put("abc", "def");
assert(s.ok());

// 設置回滾點
txn->SetSavePoint();

// 事務txn寫入cba
s = txn->Put("cba", "fed");
assert(s.ok());
// 回滾至回滾點
s = txn->RollbackToSavePoint();

// 提交,此時事務中不包含對cba的寫入
s = txn->Commit();
assert(s.ok());
...

【3. GetForUpdate】spa

...
// 事務txn讀取abc並獨佔該key,確保不被外部事務再修改
s = txn->GetForUpdate(read_options, 「abc」, &value);
assert(s.ok());

// 經過TransactionDB::Put接口在事務外寫abc
// 不會成功
s = txn_db->Put(write_options, 「abc」, 「value0」);

s = txn->Commit();
assert(s.ok());
...

有時候在事務中須要對某一個key進行先讀後寫,此時則不能在寫時才進行該key的獨佔及衝突檢測操做,因此使用GetForUpdate接口讀取該key並進行獨佔指針

【4. SetSnapshot】

txn = txn_db->BeginTransaction(write_options);
// 設置事務txn使用的snapshot爲當前全局Sequence Number
txn->SetSnapshot();

// 使用TransactionDB::Put接口在事務外部寫abc
// 此時全局Sequence Number會加1
db->Put(write_options, 「key1」, 「value0」);
assert(s.ok());

// 事務txn寫入abc
s = txn->Put(「abc」, 「value1」);
s = txn->Commit();
// 這裏會失敗,由於在事務設置了snapshot以後,事務後來寫的key
// 在事務外部有過其餘寫操做,因此這裏不會成功
// Pessimistic會在Put時失敗,Optimistic會在Commit時失敗

前面說過,TransactionDB在事務中須要寫入某個key時纔對其進行獨佔或衝突檢測,有時但願在事務一開始就對其以後全部要寫入的全部key進行獨佔,此時能夠經過SetSnapshot來實現,設置了Snapshot後,外部一旦對事務中將要進行寫操做key作過修改,則該事務最終會失敗(失敗點取決因而Pessimistic仍是Optimistic,Pessimistic由於在Put時就進行衝突檢測,因此Put時就失敗,而Optimistic則會在Commit是檢測到衝突,失敗)

  1. 實現

3.1 WriteBatch & WriteBatchWithIndex
WriteBatch就不展開說了,事務會將全部的寫操做追加進同一個WriteBatch,直到Commit時才向DB原子寫入。

WriteBatchWithIndex在WriteBatch以外,額外搞一個Skiplist來記錄每個操做在WriteBatch中的offset等信息。在事務沒有commit以前,數據還不在Memtable中,而是存在WriteBatch裏,若是有須要,這時候能夠經過WriteBatchWithIndex來拿到本身剛剛寫入的但尚未提交的數據。

事務的SetSavePoint和RollbackToSavePoint也是經過WriteBatch來實現的,SetSavePoint記錄當前WriteBatch的大小及統計信息,若干操做以後,若想回滾,則只須要將WriteBatch truncate到以前記錄的大小並恢復統計信息便可。

3.2 PessimisticTransaction
PessimisticTransactionDB經過TransactionLockMgr進行行鎖管理。事務中的每次寫入操做以前都須要TryLock進Key鎖的獨佔及衝突檢測,以Put爲例:

Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
                                const Slice& key, const Slice& value) {
  // 調用TryLock搶鎖及衝突檢測
  Status s =
      TryLock(column_family, key, false /* read_only */, true /* exclusive */);

  if (s.ok()) {
    s = GetBatchForWrite()->Put(column_family, key, value);
    if (s.ok()) {
      num_puts_++;
    }
  }

  return s;
}

能夠看到Put接口定義在TransactionBase中,不管Pessimistic仍是Optimistic的Put都是這段邏輯,兩者的區別是在對TryLock的重載。先看Pessimistic的,TransactionBaseImpl::TryLock經過TransactionBaseImpl::TryLock -> PessimisticTransaction::TryLock -> PessimisticTransactionDB::TryLock -> TransactionLockMgr::TryLock一路調用到TransactionLockMgr的TryLock,在裏面完成對key加鎖,加鎖成功便實現了對key的獨佔,此時直到事務commit以前,其餘事務是沒法修改這個key的。

鎖是加成功了,但這也只能說明今後刻起到事務結束前這個key不會再被外部修改,但若是事務在最開始執行SetSnapshot設置了快照,若是在打快照和Put之間的過程當中外部對相同key進行了修改(並commit),此時已經打破了snapshot的保證,因此事務以後的Put也不能成功,這個衝突檢測也是在PessimisticTransaction::TryLock中作的,以下:

Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
                                       const Slice& key, bool read_only,
                                       bool exclusive, bool skip_validate) {
  ...
  // 加鎖
  if (!previously_locked || lock_upgrade) {
    s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
  }

  SetSnapshotIfNeeded();

  ...
  
    // 使用事務一開始拿到的snapshot的sequence1與這個key在DB中最新
    // 的sequence2進行比較,若是sequence2 > sequence1則表明在snapshot
    // 以後,外部有對key進行過寫入,有衝突!
    s = ValidateSnapshot(column_family, key, &tracked_at_seq);

      if (!s.ok()) {
        // 檢測到衝突,解鎖
        // Failed to validate key
        if (!previously_locked) {
          // Unlock key we just locked
          if (lock_upgrade) {
            s = txn_db_impl_->TryLock(this, cfh_id, key_str,
                                      false /* exclusive */);
            assert(s.ok());
          } else {
            txn_db_impl_->UnLock(this, cfh_id, key.ToString());
          }
        }
      }
  
  if (s.ok()) {
    // 若是加鎖及衝突檢測經過,記錄這個key以便事務結束時釋放掉鎖
    // We must track all the locked keys so that we can unlock them later. If
    // the key is already locked, this func will update some stats on the
    // tracked key. It could also update the tracked_at_seq if it is lower than
    // the existing trackey seq.
    TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive);
  }
}

其中ValidateSnapshot就是進行衝突檢測,經過將事務設置的snapshot與key最新的sequence進行比較,若是小於key最新的sequence,則表明設置snapshot後,外部事務修改過這個key,有衝突!獲取key最新的sequence也是簡單粗暴,遍歷memtable,immutable memtable,memtable list history及SST文件來拿。總結以下圖:

圖片描述

GetForUpdate的邏輯和Put差很少,無非就是以Get之名行Put之事(加鎖及衝突檢測),以下圖:

圖片描述

接着介紹下TransactionLockMgr,以下圖:

圖片描述

最外層先是一個std::unordered_map,將每一個ColumnFamily映射到一個LockMap,每一個LockMap默認有16個LockMapStripe,而後每一個LockMapStripe裏包含一個std::unordered_map keys,這就是存放每一個key對應的鎖信息的。因此每次加鎖過程大體以下:

首先經過ThreadLocal拿到lock_maps指針
經過column family ID 拿到對應的LockMap
對key hash映射到某個LockMapStripe,對該LockMapStripe加鎖(同一LockMapStripe下的全部key會搶同一把鎖,粒度略大)
操做LockMapStripe裏的std::unordered_map完成加鎖
3.3 OptimisticTransaction
OptimisticTransactionDB不使用鎖進行key的獨佔,只在commit是進行衝突檢測。因此

OptimisticTransaction::TryLock以下:

Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
                                      const Slice& key, bool read_only,
                                      bool exclusive, bool untracked) {
  if (untracked) {
    return Status::OK();
  }
  uint32_t cfh_id = GetColumnFamilyID(column_family);

  SetSnapshotIfNeeded();
  // 若是設置了以前事務snapshot,這裏使用它做爲key的seq
  // 若是沒有設置snapshot,則以當前全局的sequence做爲key的seq
  SequenceNumber seq;
  if (snapshot_) {
    seq = snapshot_->GetSequenceNumber();
  } else {
    seq = db_->GetLatestSequenceNumber();
  }

  std::string key_str = key.ToString();
  // 記錄這個key及其對應的seq,後期在commit時經過使用這個seq和
  // key當前的最新sequence比較來作衝突檢測
  TrackKey(cfh_id, key_str, seq, read_only, exclusive);

  // Always return OK. Confilct checking will happen at commit time.
  return Status::OK();
}
這裏TryLock實際上就是給key標記一個sequence並記錄,用做commit時的衝突檢測,commit實現以下:

Status OptimisticTransaction::Commit() {
  // Set up callback which will call CheckTransactionForConflicts() to
  // check whether this transaction is safe to be committed.
  OptimisticTransactionCallback callback(this);

  DBImpl* db_impl = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
  // 調用WriteWithCallback進行衝突檢測,若是沒有衝突就寫入DB
  Status s = db_impl->WriteWithCallback(
      write_options_, GetWriteBatch()->GetWriteBatch(), &callback);

  if (s.ok()) {
    Clear();
  }

  return s;
}

衝突檢測的實如今OptimisticTransactionCallback裏,和設置了snapshot的PessimisticTransaction同樣,最終仍是會調用TransactionUtil::CheckKeysForConflicts來檢測,也就是比較sequence。總體以下圖:

圖片描述

3.4 兩階段提交(Two Phase Commit)
在分佈式場景下使用PessimisticTransaction時,咱們可能須要使用兩階段提交(2PC)來確保一個事務在多個節點上執行成功,因此PessimisticTransaction也支持2PC。具體作法也不難,就是將以前commit拆分爲prepare和commit,prepare階段進行WAL的寫入,commit階段進行Memtable的寫入(寫入後其餘事務方可見),因此如今一個事務的操做流程以下:

BeginTransaction
GetForUpdate
Put
...
Prepare
Commit

使用2PC,咱們首先要經過SetName爲一個事務設置惟一的標識並註冊到全局映射表裏,這裏記錄着全部未完成的2PC事務,當Commit後再從映射表裏刪除。

接下來具體2PC實現無非就是在WriteBatch上作文章,經過特殊的標記來控制寫WAL和Memtable,簡單說一下:

正常的WriteBatch結構以下:

Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
2PC一開始的WriteBatch以下:

Sequence(0);NumRecords(0);Noop;
先使用一個Noop佔位,至於爲何,後面再說。緊接着就是一些操做,操做後,WriteBatch以下:

Sequence(0);NumRecords(3);Noop;Put(a,1);Merge(a,1);Delete(a);
而後執行Prepare,寫WAL,在寫WAL以前,先會隊WriteBatch作一些改動,插入Prepare和EndPrepare記錄,以下:

Sequence(0);NumRecords(3);Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid)
能夠看到這裏將以前的Noop佔位換成Prepare,而後在結尾插入EndPrepare(xid),構造好WriteBatch後就直接調用WriteImpl寫WAL了。注意,此時往WAL裏寫的這條日誌的sequence雖然比VersionSet的last_sequence大,但寫入WAL以後並不會調用SetLastSequence來更新VersionSet的last_sequence,它只有在最後寫入Memtable以後才更新,具體作法就是給VersionSet除了last_sequence_以外,再加一個last_allocated_sequence_,初始相等,寫WAL是加後者,後者對外不可見,commit後再加前者。因此一旦PessimisticTransactionDB使用了2PC,就要求全部都是2PC,否則last_sequence_可能會錯亂(更正:若是使用two_write_queues_,不論是Prepare -> Commit仍是直接Commit,sequence的增加都是以last_allocated_sequence_爲準,最後用它來調整last_sequence_;若是不使用two_write_queues_則直接以last_sequence_爲準,總之不會出現sequence混錯,因此能夠Prepare -> Commit和Commit混用)。

WAL寫完以後,即便沒有commit就宕機也沒事,重啓後Recovery會將事務從WAL恢復記錄到全局recovered_transaction中,等待Commit

最後就是Commit,Commit階段會使用一個新的CommitTime WriteBatch,和以前的WriteBatch合併整理後最終使用CommitTime WriteBatch寫Memtable

整理後的CommitTime WriteBatch以下:

Sequence(0);NumRecords(3);Commit(xid);
Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid);

將CommitTime WriteBatch的WALTerminalPoint設置到Commit(xid)處,告訴Writer寫WAL時寫到這裏就能夠停了,其實就是隻將Commit記錄寫進WAL(由於其後的記錄在Prepare階段就已經寫到WAL了);

在最後就是MemTableInserter遍歷這個CommitTime WriteBatch向memtable寫入,具體就不說了。寫入成功後,更新VersionSet的last_sequence_,至此,事務成功提交。

  1. WritePrepared & WriteUnprepared

咱們能夠看到不管是Pessimistic仍是Optimistic,都有一個共同缺點,那就是在事務最終Commit以前,因此數據都是緩存在內存(WriteBatch)裏,對於很大的事務來講,這很是耗費內存而且將全部實際寫入壓力都扔給Commit階段來搞,性能有瓶頸,因此RocksDB正在支持WritePolicy爲WritePrepared和WriteUnprepared的PessimisticTransaction,主要思想就是將對Memtable的寫入提早,

若是放到Prepare階段那就是WritePrepared

若是再往前,每次操做直接寫Memtable那就是WriteUnprepared

能夠看到WriteUnprepared不管內存佔用仍是寫入壓力點的分散都作的最好,WritePrepared稍遜。

支持這倆新的WritePolicy的難點在於如何保證寫入到Memtable但還未Commit的數據不被其餘事物看到,這裏就須要在Sequence上大作文章了,目前Rocksdb支持了WritePrepare、而WriteUnprepared還未支持,期待後續...

  1. 隔離級別

看了前面的介紹,這裏就不用展開說了

TransactionDB支持ReadCommitted和RepeatableReads級別的隔離、

原文連接本文爲雲棲社區原創內容,未經容許不得轉載

相關文章
相關標籤/搜索