本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 | 日期 | 備註 |
---|---|---|
1.0 | 2020.3.12 | 文章首發 |
在上篇文章中,咱們簡單提到了Zookeeper的幾個核心點。在這篇文章中,咱們就來探索其存儲技術。在開始前,讀者能夠考慮思考下列問題:java
抱着問題,咱們進入下面的內容。node
衆所周知,Zookeeper不擅長大量數據的讀寫、頻繁的讀操做。這與其存儲模型息息相關——典型的LSM( Log-Structured Merge-Tree)。數據庫
在這裏簡單的介紹LSM模型。該模型於1996年發明,2006年從Bigtable開始纔開始收到關注。咱們能夠直譯爲日誌結構的合併樹,這樣能夠猜出這個數據結構的大概:以日誌爲基礎,不停的append,並組織成可合併的樹形式。apache
從圖中能夠看出來,這是一個多層結構。C0在內存裏,其他幾層都在磁盤中。當C0到達必定大小時,就會向下作Compaction。segmentfault
C0通常咱們稱爲MTable(MemoryTable),用來保存有序KV對的內存緩衝區。api
而磁盤中的Ck咱們稱爲SSTable(Sorted String Table),用於保存有序KV對的只讀文件。服務器
LSM存儲MVCC的key-value。每次更新一個key-value都會生成一個新版本,刪除一個key-value都會生成tomstone的新版本。session
前面提到了Log-Structured,其表如今數據在寫入前,會作WAL(Write Ahead Logging)。這是普遍使用的保證多Block數據寫入原子性的技術。通常在Block寫入以前,會先把新數據寫到一個日誌中。只有寫入END並調用Sync API,纔開始對Block開始寫入。若是在對block進行寫入的任什麼時候發生crash,均可以在重啓時使用WAL裏面的數據完成Block的寫入。數據結構
另外,經過WAL,咱們在提交一個操做以前只須要進行文件的順序寫入,從而減小了包含多Block文件操做的數據寫入延時。架構
典型基於LSM的存儲引擎有:
至此,寫操做就完成了。讀者可能會問萬一碰上Compaction呢?這是在後臺作的,並不阻塞寫入。
咱們以存儲引擎放大指標(Amplification Factors)來對兩個存儲模型進行對比,在這裏,簡單介紹一下幾個指標:
根據指標,咱們能夠做出一個簡單的對比:
LSM | B+Tree | |
---|---|---|
讀放大 | 一個讀操做要對多個Level上的SSTable進行讀操做 | 一個Key-value的讀操做涉及一個數據頁的讀操做,若干個索引頁的讀操做 |
寫放大 | 一個key-value值的寫操做要在多級的SSTable上進行 | 一個key-value的寫操做涉及數據頁的寫操做,若干個索引頁的寫操做 |
空間放大 | 在SSTable中存儲一個key-value的多個版本 | 索引頁和gragmentation |
LSM和B+Tree在存儲性能上的比較:
上面提到了LSM的優缺點,接下來,咱們來談一談常見的優化思路和Zk中採用的方案。
通常的WAL中每次寫完END都要調用一次耗時的Sync API,這實際上是會影響到系統的性能。爲了解決這個問題,咱們能夠一次提交多個數據寫入——只在最後一個數據寫入的END日誌以後,才調用Sync API。like this:
BEGIN Data1 END Sync
BEGIN Data2 END Sync
BEGIN Data3 END Sync
BEGIN Data1 END
BEGIN Data2 END
BEGIN Data3 END Sync`凡事都有代價,這可能會引發數據一致性相關的問題。
在往 WAL 裏面追加日誌的時候,若是當前的文件 block 不能保存新添加的日誌,就要爲文件分配新的 block,這要更新文件 inode 裏面的信息(例如 size)。若是咱們使用的是 HHD 的話,就要先 seek 到 inode 所在的位置,而後回到新添加 block 的位置進行日誌追加。爲了減小這些 seek,咱們能夠預先爲 WAL 分配 block。例如 ZooKeeper 就是每次爲 WAL 分配 64MB 的 block。
因此這也是Zookeeper不擅長讀寫大數據的緣由之一,這會引發大量的Block分配。
若是咱們使用一個內存數據結構加 WAL 的存儲方案,WAL 就會一直增加。這樣在存儲系統啓動的時候,就要讀取大量的 WAL 日誌數據來重建內存數據。快照能夠解決這個問題。
除了解決啓動時間過長的問題以外,快照還能夠減小存儲空間的使用。WAL 的多個日誌條目有多是對同一個數據的改動,經過快照,就能夠只保留最新的數據改動(Merge)。
Zk的確採用了這個方案來作優化。還帶來的一個好處是:在一個節點加入時,就會傳最新的Snapshot過去同步數據。
經過Bloom過濾器來加速查找——這是一種隨機的數據結構,能夠在O(1)的時間內判斷一個給定的元素是否在集合中。注意:Bloom Filiter是可能產生Flase positive的(元素可能不在集合中)。
你能夠將Bloom過濾器理解爲一個哈希字典,可是真實的Bloom過濾器會比哈希字典複雜點,但這樣想並不影響你去理解它。
這麼看起來,在最壞的狀況下,Bloom Filter彷佛沒什麼用。
若是咱們一直寫入MTable,那麼MTable則會一直增大,直到超出服務器內部限制。因此咱們須要把MTable的內存數據放到Durable Storage 上去,生成 SSTable 文件,這個操做叫作 minor Compaction。另外,還有兩類常見的Compaction:
Compaction的好處:
本節內容都以3.5.7版本爲例
TxnLog是咱們前面提到的事務日誌。那麼接下來咱們就來看它的相關源碼。
先看註釋:
package org.apache.zookeeper.server.persistence; import ... /** * This class implements the TxnLog interface. It provides api's * to access the txnlogs and add entries to it. * <p> * The format of a Transactional log is as follows: * <blockquote><pre> * LogFile: * FileHeader TxnList ZeroPad * * FileHeader: { * magic 4bytes (ZKLG) * version 4bytes * dbid 8bytes * } * * TxnList: * Txn || Txn TxnList * * Txn: * checksum Txnlen TxnHeader Record 0x42 * * checksum: 8bytes Adler32 is currently used * calculated across payload -- Txnlen, TxnHeader, Record and 0x42 * * Txnlen: * len 4bytes * * TxnHeader: { * sessionid 8bytes * cxid 4bytes * zxid 8bytes * time 8bytes * type 4bytes * } * * Record: * See Jute definition file for details on the various record types * * ZeroPad: * 0 padded to EOF (filled during preallocation stage) * </pre></blockquote> */ public class FileTxnLog implements TxnLog, Closeable {
在註釋中,咱們能夠看到一個FileLog由三部分組成:
關於FileHeader,能夠理解其爲一個標示符。TxnList則爲主要內容。ZeroPad是一個終結符。
咱們來看看最典型的append方法,能夠將其理解WAL過程當中的核心方法額:
/** * append an entry to the transaction log * @param hdr the header of the transaction * @param txn the transaction part of the entry * returns true iff something appended, otw false */ public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr == null) { //爲null意味着這是一個讀請求,直接返回 return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } else { lastZxidSeen = hdr.getZxid(); } if (logStream==null) { //爲空的話則new一個Stream if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid())); } logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); //寫file header // Make sure that the magic number is written before padding. logStream.flush(); // zxid必須比日誌先落盤 filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); //加入須要Flush的隊列 } filePadding.padFile(fos.getChannel()); //擴容。每次64m擴容 byte[] buf = Util.marshallTxnEntry(hdr, txn); //序列化寫入 if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } //生成butyArray的checkSum Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC");//寫入日誌裏 Util.writeTxnBytes(oa, buf); return true; }
這裏有個zxid(ZooKeeper Transaction Id),有點像MySQL的GTID。每次對Zookeeper的狀態的改變都會產生一個zxid,zxid是全局有序的,若是zxid1小於zxid2,則zxid1在zxid2以前發生。
這個方法被調用的時機大體有:
/** * commit the logs. make sure that everything hits the * disk */ public synchronized void commit() throws IOException { if (logStream != null) { logStream.flush(); } for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { long startSyncNS = System.nanoTime(); FileChannel channel = log.getChannel(); channel.force(false);//對應fdataSync syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { if(serverStats != null) { serverStats.incrementFsyncThresholdExceedCount(); } LOG.warn("fsync-ing the write ahead log in " + Thread.currentThread().getName() + " took " + syncElapsedMS + "ms which will adversely effect operation latency. " + "File size is " + channel.size() + " bytes. " + "See the ZooKeeper troubleshooting guide"); } } } while (streamsToFlush.size() > 1) { streamsToFlush.removeFirst().close(); } }
代碼很是的簡單。若是logStream還有,那就先刷下去。而後遍歷待flush的隊列(是個鏈表,用來保持操做順序),同時還會關注寫入的時間,若是過長,則會打一個Warn的日誌。
DataTree是Zk的內存數據結構——就是咱們以前說到的MTable。它以樹狀結構來組織DataNode。
這麼聽起來可能有點雲裏霧裏,不妨直接看一下DataNode的相關代碼。
public class DataNode implements Record { /** the data for this datanode */ byte data[]; /** * the acl map long for this datanode. the datatree has the map */ Long acl; /** * the stat for this node that is persisted to disk. */ public StatPersisted stat; /** * the list of children for this node. note that the list of children string * does not contain the parent path -- just the last part of the path. This * should be synchronized on except deserializing (for speed up issues). */ private Set<String> children = null; ..... }
若是用過ZkClient的小夥伴,可能很是熟悉。這就是咱們根據一個path獲取數據時返回的相關屬性——這就是用來描述存儲數據的一個類。注意,DataNode還會維護它的Children。
簡單瞭解DataNode後,咱們來看一下DataTree。爲了不干擾,咱們選出最關鍵的成員變量:
public class DataTree { private static final Logger LOG = LoggerFactory.getLogger(DataTree.class); /** * This hashtable provides a fast lookup to the datanodes. The tree is the * source of truth and is where all the locking occurs */ private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>(); private final WatchManager dataWatches = new WatchManager(); private final WatchManager childWatches = new WatchManager(); /** * This hashtable lists the paths of the ephemeral nodes of a session. */ private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>(); ....... }
咱們能夠看到,DataTree本質上是經過一個ConcurrentHashMap來存儲DataNode的(臨時節點也是)。保存的是 DataNode 的 path 到 DataNode 的映射。
那爲何要保存兩個狀態呢?這得看調用它們被調用的場景:
若是須要獲取全部節點的信息,顯然遍歷樹會比一個個從ConcurrentHashMap 拿快。
接下來看一下序列化的相關代碼:
/** * this method uses a stringbuilder to create a new path for children. This * is faster than string appends ( str1 + str2). * * @param oa * OutputArchive to write to. * @param path * a string builder. * @throws IOException * @throws InterruptedException */ void serializeNode(OutputArchive oa, StringBuilder path) throws IOException { String pathString = path.toString(); DataNode node = getNode(pathString); if (node == null) { return; } String children[] = null; DataNode nodeCopy; synchronized (node) { StatPersisted statCopy = new StatPersisted(); copyStatPersisted(node.stat, statCopy); //we do not need to make a copy of node.data because the contents //are never changed nodeCopy = new DataNode(node.data, node.acl, statCopy); Set<String> childs = node.getChildren(); children = childs.toArray(new String[childs.size()]); } serializeNodeData(oa, pathString, nodeCopy); path.append('/'); int off = path.length(); for (String child : children) { // since this is single buffer being resused // we need // to truncate the previous bytes of string. path.delete(off, Integer.MAX_VALUE); path.append(child); serializeNode(oa, path); } }
能夠看到,的確是經過DataNode的Children來遍歷全部節點。
接下來看一下反序列化的代碼:
public void deserialize(InputArchive ia, String tag) throws IOException { aclCache.deserialize(ia); nodes.clear(); pTrie.clear(); String path = ia.readString("path"); while (!"/".equals(path)) { DataNode node = new DataNode(); ia.readRecord(node, "node"); nodes.put(path, node); synchronized (node) { aclCache.addUsage(node.acl); } int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1) { root = node; } else { String parentPath = path.substring(0, lastSlash); DataNode parent = nodes.get(parentPath); if (parent == null) { throw new IOException("Invalid Datatree, unable to find " + "parent " + parentPath + " of path " + path); } parent.addChild(path.substring(lastSlash + 1)); long eowner = node.stat.getEphemeralOwner(); EphemeralType ephemeralType = EphemeralType.get(eowner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (eowner != 0) { HashSet<String> list = ephemerals.get(eowner); if (list == null) { list = new HashSet<String>(); ephemerals.put(eowner, list); } list.add(path); } } path = ia.readString("path"); } nodes.put("/", root); // we are done with deserializing the // the datatree // update the quotas - create path trie // and also update the stat nodes setupQuota(); aclCache.purgeUnused(); }
由於序列化的時候是前序遍歷。因此反序列化時是先反序列化父親節點,再反序列化孩子節點。
那麼DataTree在什麼狀況下會序列化呢?在這裏就要提到快照了。
前面提到過:若是咱們使用一個內存數據結構加 WAL 的存儲方案,WAL 就會一直增加。這樣在存儲系統啓動的時候,就要讀取大量的 WAL 日誌數據來重建內存數據。快照能夠解決這個問題。
除了減小WAL日誌,Snapshot還會在Zk全量同步時被用到——當一個全新的ZkServer(這個通常叫Learner)被加入集羣時,Leader服務器會將本機上的數據全量同步給新來的ZkServer。
接下來看一下代碼入口:
/** * serialize the datatree and session into the file snapshot * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into */ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot) throws IOException { if (!close) { try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot)); CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); serialize(dt, sessions, oa, header); long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); sessOS.flush(); } } else { throw new IOException("FileSnap has already been closed"); } }
JavaIO的基礎知識在這再也不介紹,有興趣的人能夠自行查閱資料或看 從一段代碼談起——淺談JavaIO接口。
本質就是建立文件,並調用DataTree的序列化方法,DataTree的序列化其實就是遍歷DataNode去序列化,最後將這些序列化的內容寫入文件。
/** * deserialize a data tree from the most recent snapshot * @return the zxid of the snapshot */ public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { snap = snapList.get(i); LOG.info("Reading snapshot " + snap); try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap)); CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) { InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt, sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch (IOException e) { LOG.warn("problem reading snap file " + snap, e); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
簡單來講,先讀取Snapshot文件們。並反序列化它們,組成DataTree。
在本文中,筆者和你們一塊兒學習了Zk的底層存儲技術。另外提一下,Zk中序列化技術用的是Apache Jute——本質上調用了JavaDataOutput和Input,較爲簡單。故沒在本文中展開。