本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 | 日期 | 備註 |
1.0 | 2020.3.12 | 文章首發 |
衆所周知,Zookeeper不擅長大量數據的讀寫、頻繁的讀操做。這與其存儲模型息息相關——典型的LSM( Log-Structured Merge-Tree)。數據庫
而磁盤中的Ck咱們稱爲SSTable(Sorted String Table),用於保存有序KV對的只讀文件。服務器
前面提到了Log-Structured,其表如今數據在寫入前,會作WAL(Write Ahead Logging)。這是普遍使用的保證多Block數據寫入原子性的技術。通常在Block寫入以前,會先把新數據寫到一個日誌中。只有寫入END並調用Sync API,纔開始對Block開始寫入。若是在對block進行寫入的任什麼時候發生crash,均可以在重啓時使用WAL裏面的數據完成Block的寫入。數據結構
咱們以存儲引擎放大指標(Amplification Factors)來對兩個存儲模型進行對比,在這裏,簡單介紹一下幾個指標:
LSM | B+Tree | |
讀放大 | 一個讀操做要對多個Level上的SSTable進行讀操做 | 一個Key-value的讀操做涉及一個數據頁的讀操做,若干個索引頁的讀操做 |
寫放大 | 一個key-value值的寫操做要在多級的SSTable上進行 | 一個key-value的寫操做涉及數據頁的寫操做,若干個索引頁的寫操做 |
空間放大 | 在SSTable中存儲一個key-value的多個版本 | 索引頁和gragmentation |
通常的WAL中每次寫完END都要調用一次耗時的Sync API,這實際上是會影響到系統的性能。爲了解決這個問題,咱們能夠一次提交多個數據寫入——只在最後一個數據寫入的END日誌以後,才調用Sync API。like this:
BEGIN Data1 END Sync
BEGIN Data2 END Sync
BEGIN Data3 END Sync
BEGIN Data3 END Sync`凡事都有代價,這可能會引發數據一致性相關的問題。
在往 WAL 裏面追加日誌的時候,若是當前的文件 block 不能保存新添加的日誌,就要爲文件分配新的 block,這要更新文件 inode 裏面的信息(例如 size)。若是咱們使用的是 HHD 的話,就要先 seek 到 inode 所在的位置,而後回到新添加 block 的位置進行日誌追加。爲了減小這些 seek,咱們能夠預先爲 WAL 分配 block。例如 ZooKeeper 就是每次爲 WAL 分配 64MB 的 block。
若是咱們使用一個內存數據結構加 WAL 的存儲方案,WAL 就會一直增加。這樣在存儲系統啓動的時候,就要讀取大量的 WAL 日誌數據來重建內存數據。快照能夠解決這個問題。
除了解決啓動時間過長的問題以外,快照還能夠減小存儲空間的使用。WAL 的多個日誌條目有多是對同一個數據的改動,經過快照,就能夠只保留最新的數據改動(Merge)。
經過Bloom過濾器來加速查找——這是一種隨機的數據結構,能夠在O(1)的時間內判斷一個給定的元素是否在集合中。注意:Bloom Filiter是可能產生Flase positive的(元素可能不在集合中)。
這麼看起來,在最壞的狀況下,Bloom Filter彷佛沒什麼用。
若是咱們一直寫入MTable,那麼MTable則會一直增大,直到超出服務器內部限制。因此咱們須要把MTable的內存數據放到Durable Storage 上去,生成 SSTable 文件,這個操做叫作 minor Compaction。另外,還有兩類常見的Compaction:
package org.apache.zookeeper.server.persistence; import ... /** * This class implements the TxnLog interface. It provides api's * to access the txnlogs and add entries to it. * <p> * The format of a Transactional log is as follows: * <blockquote><pre> * LogFile: * FileHeader TxnList ZeroPad * * FileHeader: { * magic 4bytes (ZKLG) * version 4bytes * dbid 8bytes * } * * TxnList: * Txn || Txn TxnList * * Txn: * checksum Txnlen TxnHeader Record 0x42 * * checksum: 8bytes Adler32 is currently used * calculated across payload -- Txnlen, TxnHeader, Record and 0x42 * * Txnlen: * len 4bytes * * TxnHeader: { * sessionid 8bytes * cxid 4bytes * zxid 8bytes * time 8bytes * type 4bytes * } * * Record: * See Jute definition file for details on the various record types * * ZeroPad: * 0 padded to EOF (filled during preallocation stage) * </pre></blockquote> */ public class FileTxnLog implements TxnLog, Closeable {
/** * append an entry to the transaction log * @param hdr the header of the transaction * @param txn the transaction part of the entry * returns true iff something appended, otw false */ public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr == null) { //爲null意味着這是一個讀請求,直接返回 return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } else { lastZxidSeen = hdr.getZxid(); } if (logStream==null) { //爲空的話則new一個Stream if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid())); } logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); //寫file header // Make sure that the magic number is written before padding. logStream.flush(); // zxid必須比日誌先落盤 filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); //加入須要Flush的隊列 } filePadding.padFile(fos.getChannel()); //擴容。每次64m擴容 byte[] buf = Util.marshallTxnEntry(hdr, txn); //序列化寫入 if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } //生成butyArray的checkSum Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC");//寫入日誌裏 Util.writeTxnBytes(oa, buf); return true; }
這裏有個zxid(ZooKeeper Transaction Id),有點像MySQL的GTID。每次對Zookeeper的狀態的改變都會產生一個zxid,zxid是全局有序的,若是zxid1小於zxid2,則zxid1在zxid2以前發生。
/** * commit the logs. make sure that everything hits the * disk */ public synchronized void commit() throws IOException { if (logStream != null) { logStream.flush(); } for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { long startSyncNS = System.nanoTime(); FileChannel channel = log.getChannel(); channel.force(false);//對應fdataSync syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { if(serverStats != null) { serverStats.incrementFsyncThresholdExceedCount(); } LOG.warn("fsync-ing the write ahead log in " + Thread.currentThread().getName() + " took " + syncElapsedMS + "ms which will adversely effect operation latency. " + "File size is " + channel.size() + " bytes. " + "See the ZooKeeper troubleshooting guide"); } } } while (streamsToFlush.size() > 1) { streamsToFlush.removeFirst().close(); } }
public class DataNode implements Record { /** the data for this datanode */ byte data[]; /** * the acl map long for this datanode. the datatree has the map */ Long acl; /** * the stat for this node that is persisted to disk. */ public StatPersisted stat; /** * the list of children for this node. note that the list of children string * does not contain the parent path -- just the last part of the path. This * should be synchronized on except deserializing (for speed up issues). */ private Set<String> children = null; ..... }
public class DataTree { private static final Logger LOG = LoggerFactory.getLogger(DataTree.class); /** * This hashtable provides a fast lookup to the datanodes. The tree is the * source of truth and is where all the locking occurs */ private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>(); private final WatchManager dataWatches = new WatchManager(); private final WatchManager childWatches = new WatchManager(); /** * This hashtable lists the paths of the ephemeral nodes of a session. */ private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>(); ....... }
咱們能夠看到,DataTree本質上是經過一個ConcurrentHashMap來存儲DataNode的(臨時節點也是)。保存的是 DataNode 的 path 到 DataNode 的映射。
若是須要獲取全部節點的信息,顯然遍歷樹會比一個個從ConcurrentHashMap 拿快。
/** * this method uses a stringbuilder to create a new path for children. This * is faster than string appends ( str1 + str2). * * @param oa * OutputArchive to write to. * @param path * a string builder. * @throws IOException * @throws InterruptedException */ void serializeNode(OutputArchive oa, StringBuilder path) throws IOException { String pathString = path.toString(); DataNode node = getNode(pathString); if (node == null) { return; } String children[] = null; DataNode nodeCopy; synchronized (node) { StatPersisted statCopy = new StatPersisted(); copyStatPersisted(node.stat, statCopy); //we do not need to make a copy of node.data because the contents //are never changed nodeCopy = new DataNode(node.data, node.acl, statCopy); Set<String> childs = node.getChildren(); children = childs.toArray(new String[childs.size()]); } serializeNodeData(oa, pathString, nodeCopy); path.append('/'); int off = path.length(); for (String child : children) { // since this is single buffer being resused // we need // to truncate the previous bytes of string. path.delete(off, Integer.MAX_VALUE); path.append(child); serializeNode(oa, path); } }
public void deserialize(InputArchive ia, String tag) throws IOException { aclCache.deserialize(ia); nodes.clear(); pTrie.clear(); String path = ia.readString("path"); while (!"/".equals(path)) { DataNode node = new DataNode(); ia.readRecord(node, "node"); nodes.put(path, node); synchronized (node) { aclCache.addUsage(node.acl); } int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1) { root = node; } else { String parentPath = path.substring(0, lastSlash); DataNode parent = nodes.get(parentPath); if (parent == null) { throw new IOException("Invalid Datatree, unable to find " + "parent " + parentPath + " of path " + path); } parent.addChild(path.substring(lastSlash + 1)); long eowner = node.stat.getEphemeralOwner(); EphemeralType ephemeralType = EphemeralType.get(eowner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (eowner != 0) { HashSet<String> list = ephemerals.get(eowner); if (list == null) { list = new HashSet<String>(); ephemerals.put(eowner, list); } list.add(path); } } path = ia.readString("path"); } nodes.put("/", root); // we are done with deserializing the // the datatree // update the quotas - create path trie // and also update the stat nodes setupQuota(); aclCache.purgeUnused(); }
前面提到過:若是咱們使用一個內存數據結構加 WAL 的存儲方案,WAL 就會一直增加。這樣在存儲系統啓動的時候,就要讀取大量的 WAL 日誌數據來重建內存數據。快照能夠解決這個問題。
/** * serialize the datatree and session into the file snapshot * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into */ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot) throws IOException { if (!close) { try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot)); CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); serialize(dt, sessions, oa, header); long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); sessOS.flush(); } } else { throw new IOException("FileSnap has already been closed"); } }
JavaIO的基礎知識在這再也不介紹,有興趣的人能夠自行查閱資料或看 從一段代碼談起——淺談JavaIO接口。
/** * deserialize a data tree from the most recent snapshot * @return the zxid of the snapshot */ public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { snap = snapList.get(i); LOG.info("Reading snapshot " + snap); try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap)); CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) { InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt, sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch (IOException e) { LOG.warn("problem reading snap file " + snap, e); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
在本文中,筆者和你們一塊兒學習了Zk的底層存儲技術。另外提一下,Zk中序列化技術用的是Apache Jute——本質上調用了JavaDataOutput和Input,較爲簡單。故沒在本文中展開。