微軟支持併發的Key-Value 存儲庫有C++與C#兩個版本。號稱迄今爲止最快的併發鍵值存儲。下面是C#版本翻譯:css
FASTER C#可在.NET Framework和.NET Core中運行,而且能夠在單線程和併發設置中使用。通過測試,能夠在Windows和Linux上使用。它公開了一種API,該API能夠執行讀取,盲更新(Upserts)和讀取-修改-寫入(RMW)操做的混合。它支持大於內存的數據,並接受IDevice將日誌存儲在文件中的實現。提供了IDevice本地文件系統的實現,也能夠寫入遠程文件系統。或者將遠程存儲映射到本地文件系統中。FASTER能夠用做傳統併發數據結構相似ConcurrentDictionary的高性能替代品,而且還支持大於內存的數據。它支持增量或非增量數據結構類型的檢查點。git
FASTER支持三種基本操做:github
Read:從鍵值存儲中讀取數據bash
Upsert:將值盲目向上插入到存儲中(不檢查先前的值)session
Read-Modify-Write:更新存儲區中的值,用於實現「求和」和「計數」之類的操做。數據結構
在實例化FASTER以前,您須要建立FASTER將使用的存儲設備。若是使用的是可移植類型(byte、int、double)類型,則僅須要混合日誌設備。若是使用對象,則須要建立一個單獨的對象日誌設備。併發
IDevice log = Devices.CreateLogDevice("C:\\Temp\\hybridlog_native.log");
而後,按以下方式建立一個FASTER實例:異步
fht = new FasterKV<Key, Value, Input, Output, Empty, Functions> (1L << 20, new Functions(), new LogSettings { LogDevice = log });
有六個基本概念,在實例化FASTER時做爲通用類型參數提供:ide
Key:這是鍵的類型,例如long。函數
Value:這是存儲在FASTER中的值的類型。
Input:這是調用Read或RMW時提供給FASTER的輸入類型。它能夠被視爲讀取或RMW操做的參數。例如,對於RMW,但是增量累加到值。
Output:這是讀操做的輸出類型,將值的相關部分複製到輸出。
Context:操做的用戶定義上下文,若是沒有必要使用Empty。
Functions:須要回調時,使用IFunctions<>調用。
用戶提供一個實例化IFunctions<>。此類型封裝了全部回調,下面將對其進行介紹:
SingleReader和併發讀ConcurrentReader:這些用於讀取存儲值並將它們複製到Output。單個讀取器能夠假定沒有併發操做。
SingleWriter和ConcurrentWriter:這些用於將值從源值寫入存儲。
Completion callbacks完成回調:各類操做完成時調用。
RMWUpdaters:用戶指定了三個更新器,InitialUpdater,InPlaceUpdater和CopyUpdater。它們一塊兒用於實現RMW操做。
Hash Table Siz哈希表大小:這是分配給FASTER的存儲行數,其中每一個行爲64字節。
LogSettings 日誌設置:這些設置與日誌的大小、設備。
Checkpoint設置:這些是與檢查相關的設置,例如檢查類型和文件夾。
Serialization序列化設置:用於爲鍵和值類型提供自定義序列化程序。序列化程序實現IObjectSerializer<Key>鍵和IObjectSerializer<Value>值。只有C#類對象非可移植類型才須要這些。
Key比較器:用於爲key提供更好的比較器IFasterEqualityComparer<Key>。
FASTER的總內存佔用量由如下參數控制:
哈希表大小:此參數(第一個構造函數參數)乘以64是內存中哈希表的大小(以字節爲單位)。
日誌大小:logSettings.MemorySizeBits表示混合日誌的內存部分的大小(以位爲單位)。換句話說對於參數設置B,日誌的大小爲2 ^ B字節。若是日誌指向類對象,則此大小不包括對象的大小,由於FASTER沒法訪問此信息。日誌的較舊部分溢出到存儲中。
實例化FASTER以後,線程可使用Session來使用FASTER
fht.StartSession();fht.StopSession();
當全部線程都在FASTER上完成操做後,您最終銷燬FASTER實例:
fht.Dispose();
如下是一個簡單示例,其中全部數據都在內存中,所以咱們沒必要擔憂掛起的I / O操做。在此示例中也沒有檢查點。
public static void Test(){ var log = Devices.CreateLogDevice("C:\\Temp\\hlog.log"); var fht = new FasterKV<long, long, long, long, Empty, Funcs> (1L << 20, new Funcs(), new LogSettings { LogDevice = log }); fht.StartSession(); long key = 1, value = 1, input = 10, output = 0; fht.Upsert(ref key, ref value, Empty.Default, 0); fht.Read(ref key, ref input, ref output, Empty.Default, 0); Debug.Assert(output == value); fht.RMW(ref key, ref input, Empty.Default, 0); fht.RMW(ref key, ref input, Empty.Default, 0); fht.Read(ref key, ref input, ref output, Empty.Default, 0); Debug.Assert(output == value + 20); fht.StopSession(); fht.Dispose(); log.Close();}
此示例的函數:
public class Funcs : IFunctions<long, long, long, long, Empty>{ public void SingleReader(ref long key, ref long input, ref long value, ref long dst) => dst = value; public void SingleWriter(ref long key, ref long src, ref long dst) => dst = src; public void ConcurrentReader(ref long key, ref long input, ref long value, ref long dst) => dst = value; public void ConcurrentWriter(ref long key, ref long src, ref long dst) => dst = src; public void InitialUpdater(ref long key, ref long input, ref long value) => value = input; public void CopyUpdater(ref long key, ref long input, ref long oldv, ref long newv) => newv = oldv + input; public void InPlaceUpdater(ref long key, ref long input, ref long value) => value += input; public void UpsertCompletionCallback(ref long key, ref long value, Empty ctx) { } public void ReadCompletionCallback(ref long key, ref long input, ref long output, Empty ctx, Status s) { } public void RMWCompletionCallback(ref long key, ref long input, Empty ctx, Status s) { } public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }}
FASTER支持基於檢查點的恢復。每一個新的檢查點都會保留(或使之持久)其餘用戶操做(讀取,更新或RMW)。FASTER容許客戶端線程跟蹤已持久的操做和未使用基於會話的API的操做。
回想一下,每一個FASTER線程都會啓動一個與惟一的Guid相關聯的會話。全部FASTER線程操做(讀取,Upsert,RMW)都帶有單調序列號。在任什麼時候間點,均可以調用Checkpoint以啓動FASTER的異步檢查點。在調用以後Checkpoint,(最終)向每一個FASTER線程通知一個序列號,這樣能夠確保直到該序列號以前的全部操做以及在該序列號以後沒有任何操做被保留爲該檢查點的一部分。FASTER線程可使用此序列號來清除等待執行的操做的任何內存緩衝區。
在恢復期間,線程可使用繼續使用相同的Guid進行會話ContinueSession。該函數返回線程本地序列號,直到恢復該會話哈希爲止。從那時起,新線程可使用此信息來重播全部未提交的操做。
下面一個單線程的簡單恢復示例。
public class PersistenceExample{ private FasterKV<long, long, long, long, Empty, Funcs> fht; private IDevice log; public PersistenceExample() { log = Devices.CreateLogDevice("C:\\Temp\\hlog.log"); fht = new FasterKV<long, long, long, long, Empty, Funcs> (1L << 20, new Funcs(), new LogSettings { LogDevice = log }); } public void Run() { IssuePeriodicCheckpoints(); RunSession(); } public void Continue() { fht.Recover(); IssuePeriodicCheckpoints(); ContinueSession(); } /* Helper Functions */ private void RunSession() { Guid guid = fht.StartSession(); System.IO.File.WriteAllText(@"C:\\Temp\\session1.txt", guid.ToString()); long seq = 0; // sequence identifier long key = 1, input = 10; while(true) { key = (seq % 1L << 20); fht.RMW(ref key, ref input, Empty.Default, seq); seq++; } // fht.StopSession() - outside infinite loop } private void ContinueSession() { string guidText = System.IO.File.ReadAllText(@"C:\\Temp\session1.txt"); Guid sessionGuid = Guid.Parse(guidText); long seq = fht.ContinueSession(sessionGuid); // recovered seq identifier seq++; long key = 1, input = 10; while(true) { key = (seq % 1L << 20); fht.RMW(ref key, ref input, Empty.Default, seq); seq++; } } private void IssuePeriodicCheckpoints() { var t = new Thread(() => { while(true) { Thread.Sleep(10000);fht.StartSession(); fht.TakeCheckpoint(out Guid token); fht.CompleteCheckpoint(token, true);fht.StopSession(); } }); t.Start(); }}
FASTER支持兩種檢查點概念:「快照」和「摺疊」。前者是將內存中的完整快照複製到一個單獨的快照文件中,然後者是自上一個檢查點以來更改的增量檢查點。摺疊有效地將混合日誌的只讀標記移到尾部,所以全部數據都做爲同一混合日誌的一部分保留(沒有單獨的快照文件)。全部後續更新均寫入新的混合日誌尾部位置,這使Fold-Over具備增量性質。
項目路徑:
https://github.com/Microsoft/FASTER/tree/master/cs