摘要: RocksDB版本:v5.13.4 1. 概述 得益於LSM-Tree結構,RocksDB全部的寫入並不是是update in-place,因此他支持起來事務的難度也相對較小,主要原理就是利用WriteBatch將事務全部寫操做在內存緩存打包,而後在commit時一次性將WriteBatch寫入,保證了原子,另外經過Sequence和Key鎖來解決衝突實現隔離。緩存
RocksDB版本:v5.13.4併發
得益於LSM-Tree結構,RocksDB全部的寫入並不是是update in-place,因此他支持起來事務的難度也相對較小,主要原理就是利用WriteBatch將事務全部寫操做在內存緩存打包,而後在commit時一次性將WriteBatch寫入,保證了原子,另外經過Sequence和Key鎖來解決衝突實現隔離。app
RocksDB的Transaction分爲兩類:Pessimistic和Optimistic,相似悲觀鎖和樂觀鎖的區別,PessimisticTransaction的衝突檢測和加鎖是在事務中每次寫操做以前作的(commit後釋放),若是失敗則該操做失敗;OptimisticTransaction不加鎖,衝突檢測是在commit階段作的,commit時發現衝突則失敗。分佈式
具體使用時須要結合實際場景來選擇,若是併發事務寫入操做的Key重疊度不高,那麼用Optimistic更合適一些(省掉Pessimistic中額外的鎖操做)oop
介紹實現原理前,先來看一下用法:性能
【1. 基本用法】ui
Options options; TransactionDBOptions txn_db_options; options.create_if_missing = true; TransactionDB* txn_db; // 打開DB(默認Pessimistic) Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db); assert(s.ok()); // 建立一個事務 Transaction* txn = txn_db->BeginTransaction(write_options); assert(txn); // 事務txn讀取一個key s = txn->Get(read_options, "abc", &value); assert(s.IsNotFound()); // 事務txn寫一個key s = txn->Put("abc", "def"); assert(s.ok()); // 經過TransactionDB::Get在事務外讀取一個key s = txn_db->Get(read_options, "abc", &value); // 經過TrasactionDB::Put在事務外寫一個key // 這裏並不會有影響,由於寫的不是"abc",不衝突 // 若是是"abc"的話 // 則Put會一直卡住直到超時或等待事務Commit(本例中會超時) s = txn_db->Put(write_options, "xyz", "zzz"); s = txn->Commit(); assert(s.ok()); // 析構事務 delete txn; delete txn_db; 經過BeginTransaction打開一個事務,而後調用Put、Get等接口進行事務操做,最後調用Commit進行提交。
【2. 回滾】this
... // 事務txn寫入abc s = txn->Put("abc", "def"); assert(s.ok()); // 設置回滾點 txn->SetSavePoint(); // 事務txn寫入cba s = txn->Put("cba", "fed"); assert(s.ok()); // 回滾至回滾點 s = txn->RollbackToSavePoint(); // 提交,此時事務中不包含對cba的寫入 s = txn->Commit(); assert(s.ok()); ...
【3. GetForUpdate】spa
... // 事務txn讀取abc並獨佔該key,確保不被外部事務再修改 s = txn->GetForUpdate(read_options, 「abc」, &value); assert(s.ok()); // 經過TransactionDB::Put接口在事務外寫abc // 不會成功 s = txn_db->Put(write_options, 「abc」, 「value0」); s = txn->Commit(); assert(s.ok()); ...
有時候在事務中須要對某一個key進行先讀後寫,此時則不能在寫時才進行該key的獨佔及衝突檢測操做,因此使用GetForUpdate接口讀取該key並進行獨佔指針
【4. SetSnapshot】
txn = txn_db->BeginTransaction(write_options); // 設置事務txn使用的snapshot爲當前全局Sequence Number txn->SetSnapshot(); // 使用TransactionDB::Put接口在事務外部寫abc // 此時全局Sequence Number會加1 db->Put(write_options, 「key1」, 「value0」); assert(s.ok()); // 事務txn寫入abc s = txn->Put(「abc」, 「value1」); s = txn->Commit(); // 這裏會失敗,由於在事務設置了snapshot以後,事務後來寫的key // 在事務外部有過其餘寫操做,因此這裏不會成功 // Pessimistic會在Put時失敗,Optimistic會在Commit時失敗
前面說過,TransactionDB在事務中須要寫入某個key時纔對其進行獨佔或衝突檢測,有時但願在事務一開始就對其以後全部要寫入的全部key進行獨佔,此時能夠經過SetSnapshot來實現,設置了Snapshot後,外部一旦對事務中將要進行寫操做key作過修改,則該事務最終會失敗(失敗點取決因而Pessimistic仍是Optimistic,Pessimistic由於在Put時就進行衝突檢測,因此Put時就失敗,而Optimistic則會在Commit是檢測到衝突,失敗)
3.1 WriteBatch & WriteBatchWithIndex
WriteBatch就不展開說了,事務會將全部的寫操做追加進同一個WriteBatch,直到Commit時才向DB原子寫入。
WriteBatchWithIndex在WriteBatch以外,額外搞一個Skiplist來記錄每個操做在WriteBatch中的offset等信息。在事務沒有commit以前,數據還不在Memtable中,而是存在WriteBatch裏,若是有須要,這時候能夠經過WriteBatchWithIndex來拿到本身剛剛寫入的但尚未提交的數據。
事務的SetSavePoint和RollbackToSavePoint也是經過WriteBatch來實現的,SetSavePoint記錄當前WriteBatch的大小及統計信息,若干操做以後,若想回滾,則只須要將WriteBatch truncate到以前記錄的大小並恢復統計信息便可。
3.2 PessimisticTransaction
PessimisticTransactionDB經過TransactionLockMgr進行行鎖管理。事務中的每次寫入操做以前都須要TryLock進Key鎖的獨佔及衝突檢測,以Put爲例:
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { // 調用TryLock搶鎖及衝突檢測 Status s = TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { s = GetBatchForWrite()->Put(column_family, key, value); if (s.ok()) { num_puts_++; } } return s; }
能夠看到Put接口定義在TransactionBase中,不管Pessimistic仍是Optimistic的Put都是這段邏輯,兩者的區別是在對TryLock的重載。先看Pessimistic的,TransactionBaseImpl::TryLock經過TransactionBaseImpl::TryLock -> PessimisticTransaction::TryLock -> PessimisticTransactionDB::TryLock -> TransactionLockMgr::TryLock一路調用到TransactionLockMgr的TryLock,在裏面完成對key加鎖,加鎖成功便實現了對key的獨佔,此時直到事務commit以前,其餘事務是沒法修改這個key的。
鎖是加成功了,但這也只能說明今後刻起到事務結束前這個key不會再被外部修改,但若是事務在最開始執行SetSnapshot設置了快照,若是在打快照和Put之間的過程當中外部對相同key進行了修改(並commit),此時已經打破了snapshot的保證,因此事務以後的Put也不能成功,這個衝突檢測也是在PessimisticTransaction::TryLock中作的,以下:
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, bool skip_validate) { ... // 加鎖 if (!previously_locked || lock_upgrade) { s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive); } SetSnapshotIfNeeded(); ... // 使用事務一開始拿到的snapshot的sequence1與這個key在DB中最新 // 的sequence2進行比較,若是sequence2 > sequence1則表明在snapshot // 以後,外部有對key進行過寫入,有衝突! s = ValidateSnapshot(column_family, key, &tracked_at_seq); if (!s.ok()) { // 檢測到衝突,解鎖 // Failed to validate key if (!previously_locked) { // Unlock key we just locked if (lock_upgrade) { s = txn_db_impl_->TryLock(this, cfh_id, key_str, false /* exclusive */); assert(s.ok()); } else { txn_db_impl_->UnLock(this, cfh_id, key.ToString()); } } } if (s.ok()) { // 若是加鎖及衝突檢測經過,記錄這個key以便事務結束時釋放掉鎖 // We must track all the locked keys so that we can unlock them later. If // the key is already locked, this func will update some stats on the // tracked key. It could also update the tracked_at_seq if it is lower than // the existing trackey seq. TrackKey(cfh_id, key_str, tracked_at_seq, read_only, exclusive); } }
其中ValidateSnapshot就是進行衝突檢測,經過將事務設置的snapshot與key最新的sequence進行比較,若是小於key最新的sequence,則表明設置snapshot後,外部事務修改過這個key,有衝突!獲取key最新的sequence也是簡單粗暴,遍歷memtable,immutable memtable,memtable list history及SST文件來拿。總結以下圖:
GetForUpdate的邏輯和Put差很少,無非就是以Get之名行Put之事(加鎖及衝突檢測),以下圖:
接着介紹下TransactionLockMgr,以下圖:
最外層先是一個std::unordered_map,將每一個ColumnFamily映射到一個LockMap,每一個LockMap默認有16個LockMapStripe,而後每一個LockMapStripe裏包含一個std::unordered_map keys,這就是存放每一個key對應的鎖信息的。因此每次加鎖過程大體以下:
首先經過ThreadLocal拿到lock_maps指針
經過column family ID 拿到對應的LockMap
對key hash映射到某個LockMapStripe,對該LockMapStripe加鎖(同一LockMapStripe下的全部key會搶同一把鎖,粒度略大)
操做LockMapStripe裏的std::unordered_map完成加鎖
3.3 OptimisticTransaction
OptimisticTransactionDB不使用鎖進行key的獨佔,只在commit是進行衝突檢測。因此
OptimisticTransaction::TryLock以下: Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, bool untracked) { if (untracked) { return Status::OK(); } uint32_t cfh_id = GetColumnFamilyID(column_family); SetSnapshotIfNeeded(); // 若是設置了以前事務snapshot,這裏使用它做爲key的seq // 若是沒有設置snapshot,則以當前全局的sequence做爲key的seq SequenceNumber seq; if (snapshot_) { seq = snapshot_->GetSequenceNumber(); } else { seq = db_->GetLatestSequenceNumber(); } std::string key_str = key.ToString(); // 記錄這個key及其對應的seq,後期在commit時經過使用這個seq和 // key當前的最新sequence比較來作衝突檢測 TrackKey(cfh_id, key_str, seq, read_only, exclusive); // Always return OK. Confilct checking will happen at commit time. return Status::OK(); } 這裏TryLock實際上就是給key標記一個sequence並記錄,用做commit時的衝突檢測,commit實現以下: Status OptimisticTransaction::Commit() { // Set up callback which will call CheckTransactionForConflicts() to // check whether this transaction is safe to be committed. OptimisticTransactionCallback callback(this); DBImpl* db_impl = static_cast_with_check<DBImpl, DB>(db_->GetRootDB()); // 調用WriteWithCallback進行衝突檢測,若是沒有衝突就寫入DB Status s = db_impl->WriteWithCallback( write_options_, GetWriteBatch()->GetWriteBatch(), &callback); if (s.ok()) { Clear(); } return s; }
衝突檢測的實如今OptimisticTransactionCallback裏,和設置了snapshot的PessimisticTransaction同樣,最終仍是會調用TransactionUtil::CheckKeysForConflicts來檢測,也就是比較sequence。總體以下圖:
3.4 兩階段提交(Two Phase Commit)
在分佈式場景下使用PessimisticTransaction時,咱們可能須要使用兩階段提交(2PC)來確保一個事務在多個節點上執行成功,因此PessimisticTransaction也支持2PC。具體作法也不難,就是將以前commit拆分爲prepare和commit,prepare階段進行WAL的寫入,commit階段進行Memtable的寫入(寫入後其餘事務方可見),因此如今一個事務的操做流程以下:
BeginTransaction GetForUpdate Put ... Prepare Commit
使用2PC,咱們首先要經過SetName爲一個事務設置惟一的標識並註冊到全局映射表裏,這裏記錄着全部未完成的2PC事務,當Commit後再從映射表裏刪除。
接下來具體2PC實現無非就是在WriteBatch上作文章,經過特殊的標記來控制寫WAL和Memtable,簡單說一下:
正常的WriteBatch結構以下:
Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
2PC一開始的WriteBatch以下:
Sequence(0);NumRecords(0);Noop;
先使用一個Noop佔位,至於爲何,後面再說。緊接着就是一些操做,操做後,WriteBatch以下:
Sequence(0);NumRecords(3);Noop;Put(a,1);Merge(a,1);Delete(a);
而後執行Prepare,寫WAL,在寫WAL以前,先會隊WriteBatch作一些改動,插入Prepare和EndPrepare記錄,以下:
Sequence(0);NumRecords(3);Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid)
能夠看到這裏將以前的Noop佔位換成Prepare,而後在結尾插入EndPrepare(xid),構造好WriteBatch後就直接調用WriteImpl寫WAL了。注意,此時往WAL裏寫的這條日誌的sequence雖然比VersionSet的last_sequence大,但寫入WAL以後並不會調用SetLastSequence來更新VersionSet的last_sequence,它只有在最後寫入Memtable以後才更新,具體作法就是給VersionSet除了last_sequence_以外,再加一個last_allocated_sequence_,初始相等,寫WAL是加後者,後者對外不可見,commit後再加前者。因此一旦PessimisticTransactionDB使用了2PC,就要求全部都是2PC,否則last_sequence_可能會錯亂(更正:若是使用two_write_queues_,不論是Prepare -> Commit仍是直接Commit,sequence的增加都是以last_allocated_sequence_爲準,最後用它來調整last_sequence_;若是不使用two_write_queues_則直接以last_sequence_爲準,總之不會出現sequence混錯,因此能夠Prepare -> Commit和Commit混用)。
WAL寫完以後,即便沒有commit就宕機也沒事,重啓後Recovery會將事務從WAL恢復記錄到全局recovered_transaction中,等待Commit
最後就是Commit,Commit階段會使用一個新的CommitTime WriteBatch,和以前的WriteBatch合併整理後最終使用CommitTime WriteBatch寫Memtable
整理後的CommitTime WriteBatch以下:
Sequence(0);NumRecords(3);Commit(xid); Prepare();Put(a,1);Merge(a,1);Delete(a);EndPrepare(xid);
將CommitTime WriteBatch的WALTerminalPoint設置到Commit(xid)處,告訴Writer寫WAL時寫到這裏就能夠停了,其實就是隻將Commit記錄寫進WAL(由於其後的記錄在Prepare階段就已經寫到WAL了);
在最後就是MemTableInserter遍歷這個CommitTime WriteBatch向memtable寫入,具體就不說了。寫入成功後,更新VersionSet的last_sequence_,至此,事務成功提交。
咱們能夠看到不管是Pessimistic仍是Optimistic,都有一個共同缺點,那就是在事務最終Commit以前,因此數據都是緩存在內存(WriteBatch)裏,對於很大的事務來講,這很是耗費內存而且將全部實際寫入壓力都扔給Commit階段來搞,性能有瓶頸,因此RocksDB正在支持WritePolicy爲WritePrepared和WriteUnprepared的PessimisticTransaction,主要思想就是將對Memtable的寫入提早,
若是放到Prepare階段那就是WritePrepared
若是再往前,每次操做直接寫Memtable那就是WriteUnprepared
能夠看到WriteUnprepared不管內存佔用仍是寫入壓力點的分散都作的最好,WritePrepared稍遜。
支持這倆新的WritePolicy的難點在於如何保證寫入到Memtable但還未Commit的數據不被其餘事物看到,這裏就須要在Sequence上大作文章了,目前Rocksdb支持了WritePrepare、而WriteUnprepared還未支持,期待後續...
看了前面的介紹,這裏就不用展開說了
TransactionDB支持ReadCommitted和RepeatableReads級別的隔離、
原文連接本文爲雲棲社區原創內容,未經容許不得轉載