Zookeeper學習筆記(二):存儲技術

本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 日期 備註
1.0 2020.3.12 文章首發

前言

在上篇文章中,咱們簡單提到了Zookeeper的幾個核心點。在這篇文章中,咱們就來探索其存儲技術。在開始前,讀者能夠考慮思考下列問題:java

  • Zookeeper的數據存儲是如何實現的?
  • Zookeeper進行一次寫操做的時候,會發生什麼?
  • Zookeeper進行一次讀操做的時候,會發生什麼?
  • 相比數據庫(以MySQL爲例)來講,其讀放大、寫放大、空間放大的優劣
  • 當一個Zookeeper新加入現有集羣時,如何同步現集羣中的數據?

抱着問題,咱們進入下面的內容。node

Zookeper本地存儲架構

衆所周知,Zookeeper不擅長大量數據的讀寫、頻繁的讀操做。這與其存儲模型息息相關——典型的LSM( Log-Structured Merge-Tree)。數據庫

在這裏簡單的介紹LSM模型。該模型於1996年發明,2006年從Bigtable開始纔開始收到關注。咱們能夠直譯爲日誌結構的合併樹,這樣能夠猜出這個數據結構的大概:以日誌爲基礎,不停的append,並組織成可合併的樹形式。apache

從圖中能夠看出來,這是一個多層結構。C0在內存裏,其他幾層都在磁盤中。當C0到達必定大小時,就會向下作Compaction。segmentfault

C0通常咱們稱爲MTable(MemoryTable),用來保存有序KV對的內存緩衝區。api

而磁盤中的Ck咱們稱爲SSTable(Sorted String Table),用於保存有序KV對的只讀文件。服務器

LSM存儲MVCC的key-value。每次更新一個key-value都會生成一個新版本,刪除一個key-value都會生成tomstone的新版本。session

前面提到了Log-Structured,其表如今數據在寫入前,會作WAL(Write Ahead Logging)。這是普遍使用的保證多Block數據寫入原子性的技術。通常在Block寫入以前,會先把新數據寫到一個日誌中。只有寫入END並調用Sync API,纔開始對Block開始寫入。若是在對block進行寫入的任什麼時候發生crash,均可以在重啓時使用WAL裏面的數據完成Block的寫入。數據結構

另外,經過WAL,咱們在提交一個操做以前只須要進行文件的順序寫入,從而減小了包含多Block文件操做的數據寫入延時。架構

典型基於LSM的存儲引擎有:

  • LevelDB:基於C++開發,Chrome的IndexedDB使用的是LevelDB
  • RocksDB:基於C++開發,功能豐富,應用普遍,例如CockroachDB、TiKV和Kafka Streams等。
  • Pebble:Go語言開發,應用於CockeroachDB。
  • BadgerDB:一種分離存儲key和value的LSM存儲引擎。
  • WiredTiger:WiredTiger除了支持B-Tree覺得,還支持LSM。

LSM的讀寫

當一次寫操做發生時

  1. 將操做寫入事務日誌(WAL)
  2. 寫入MTable

至此,寫操做就完成了。讀者可能會問萬一碰上Compaction呢?這是在後臺作的,並不阻塞寫入。

當一次讀操做發生時

  1. 尋找MTable裏的數據
  2. 若是MTable中沒有數據,則往SSTable中一個個找。

橫向對比B-Tree

咱們以存儲引擎放大指標(Amplification Factors)來對兩個存儲模型進行對比,在這裏,簡單介紹一下幾個指標:

  1. 讀放大(read amplification):一個查詢涉及的外部存儲讀操做次數。若是咱們查詢一個數據須要作3次外部存儲讀取,那麼讀放大就是3.
  2. 寫放大(write amplification):寫入外部存儲的數據量和寫入應用的數據量的比率。若是咱們對應用寫入了1MB,可是對外部存儲寫入了2MB,那麼寫放大就是2.
  3. 空間放大(space amplification):數據庫佔用的外部存儲量和應用自己數據量的比率。若是一個1MB的應用數據佔用了10MB,那麼空間放大就是10倍。

根據指標,咱們能夠做出一個簡單的對比:

LSM B+Tree
讀放大 一個讀操做要對多個Level上的SSTable進行讀操做 一個Key-value的讀操做涉及一個數據頁的讀操做,若干個索引頁的讀操做
寫放大 一個key-value值的寫操做要在多級的SSTable上進行 一個key-value的寫操做涉及數據頁的寫操做,若干個索引頁的寫操做
空間放大 在SSTable中存儲一個key-value的多個版本 索引頁和gragmentation

LSM和B+Tree在存儲性能上的比較:

  • 寫操做:LSM的一個寫操做涉及對日誌的追加操做和MTable的更新。而B+Tree一個寫操做對若個幹索引頁和一個數據頁進行讀寫操做,可能會致使屢次的隨機IO。因此LSM的寫操做通常比B+Tree的寫操做性能好。
  • 讀操做:LSM的一個讀操做須要對全部的SSTable的內容和MTable的內容進行合併。而在B+Tree上,一個讀操做對若干個索引頁和一個數據頁進行讀操做。因此B+Tree的讀操做性能通常比LSM的讀操做性能好。

LSM的優化

上面提到了LSM的優缺點,接下來,咱們來談一談常見的優化思路和Zk中採用的方案。

WAL優化方案1:Group Commit

通常的WAL中每次寫完END都要調用一次耗時的Sync API,這實際上是會影響到系統的性能。爲了解決這個問題,咱們能夠一次提交多個數據寫入——只在最後一個數據寫入的END日誌以後,才調用Sync API。like this:

  • without group commit: BEGIN Data1 END Sync BEGIN Data2 END Sync BEGIN Data3 END Sync
  • with group commit: BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync`

凡事都有代價,這可能會引發數據一致性相關的問題。

WAL優化方案2:File Padding

在往 WAL 裏面追加日誌的時候,若是當前的文件 block 不能保存新添加的日誌,就要爲文件分配新的 block,這要更新文件 inode 裏面的信息(例如 size)。若是咱們使用的是 HHD 的話,就要先 seek 到 inode 所在的位置,而後回到新添加 block 的位置進行日誌追加。爲了減小這些 seek,咱們能夠預先爲 WAL 分配 block。例如 ZooKeeper 就是每次爲 WAL 分配 64MB 的 block。

因此這也是Zookeeper不擅長讀寫大數據的緣由之一,這會引發大量的Block分配。

WAL優化方案3:Snapshot

若是咱們使用一個內存數據結構加 WAL 的存儲方案,WAL 就會一直增加。這樣在存儲系統啓動的時候,就要讀取大量的 WAL 日誌數據來重建內存數據。快照能夠解決這個問題。

除了解決啓動時間過長的問題以外,快照還能夠減小存儲空間的使用。WAL 的多個日誌條目有多是對同一個數據的改動,經過快照,就能夠只保留最新的數據改動(Merge)。

Zk的確採用了這個方案來作優化。還帶來的一個好處是:在一個節點加入時,就會傳最新的Snapshot過去同步數據。

LSM的讀優化:Bloom Filter

經過Bloom過濾器來加速查找——這是一種隨機的數據結構,能夠在O(1)的時間內判斷一個給定的元素是否在集合中。注意:Bloom Filiter是可能產生Flase positive的(元素可能不在集合中)。

你能夠將Bloom過濾器理解爲一個哈希字典,可是真實的Bloom過濾器會比哈希字典複雜點,但這樣想並不影響你去理解它。

這麼看起來,在最壞的狀況下,Bloom Filter彷佛沒什麼用。

LSM的寫優化:Compation

若是咱們一直寫入MTable,那麼MTable則會一直增大,直到超出服務器內部限制。因此咱們須要把MTable的內存數據放到Durable Storage 上去,生成 SSTable 文件,這個操做叫作 minor Compaction。另外,還有兩類常見的Compaction:

  • Merge Compaction:把連續Level的SSTable和MTable合併成一個SSTable。目的是減小讀取操做要讀取的SSTable數量。
  • Major Compaction:合併全部Level上的SSTable的Merge Compaction。目的在於完全刪除Tomstone數據,並釋放全部的存儲空間。

Compaction的好處:

  1. 減小內存消耗
  2. 減小讀取事務日誌來恢復數據消耗的時間

源碼解析

本節內容都以3.5.7版本爲例

核心接口和類

  • TxnLog:接口類型,提供讀寫事務日誌的API。
  • FileTxnLog:基於文件的TxnLog實現。
  • Snapshot:快照接口類型,提供序列化、反序列化、訪問快照API。
  • FileSnapshot:基於文件的Snapshot實現。
  • FileTxnSnapLog:TxnLog和Snapshot的封裝
  • DataTree:Zookeeper的內存數據結構,ZNode構成的樹。
  • DataNode:表示一個ZNode。

TxnLog

TxnLog是咱們前面提到的事務日誌。那麼接下來咱們就來看它的相關源碼。

先看註釋:

package org.apache.zookeeper.server.persistence;

import ...

/**
 * This class implements the TxnLog interface. It provides api's
 * to access the txnlogs and add entries to it.
 * <p>
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 *
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 *
 * TxnList:
 *     Txn || Txn TxnList
 *
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 *
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote>
 */
public class FileTxnLog implements TxnLog, Closeable {

在註釋中,咱們能夠看到一個FileLog由三部分組成:

  • FileHeader
  • TxnList
  • ZerdPad

關於FileHeader,能夠理解其爲一個標示符。TxnList則爲主要內容。ZeroPad是一個終結符。

append

咱們來看看最典型的append方法,能夠將其理解WAL過程當中的核心方法額:

/**
     * append an entry to the transaction log
     * @param hdr the header of the transaction
     * @param txn the transaction part of the entry
     * returns true iff something appended, otw false
     */
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr == null) { //爲null意味着這是一個讀請求,直接返回
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream==null) { //爲空的話則new一個Stream
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }

           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");   //寫file header
           // Make sure that the magic number is written before padding.
           logStream.flush();      // zxid必須比日誌先落盤
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos); //加入須要Flush的隊列
        }
        filePadding.padFile(fos.getChannel());   //擴容。每次64m擴容
        byte[] buf = Util.marshallTxnEntry(hdr, txn);  //序列化寫入
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
         //生成butyArray的checkSum
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");//寫入日誌裏
        Util.writeTxnBytes(oa, buf);
        return true;
    }

這裏有個zxid(ZooKeeper Transaction Id),有點像MySQL的GTID。每次對Zookeeper的狀態的改變都會產生一個zxid,zxid是全局有序的,若是zxid1小於zxid2,則zxid1在zxid2以前發生。

commit

這個方法被調用的時機大體有:

  • 服務端比較閒的時候去調用
  • 到請求數量超出1000時,調用。以前提到過GroupCommit,其實就是在這個時候調用的。
  • zk的shutdown鉤子被調用時,調用
/**
     * commit the logs. make sure that everything hits the
     * disk
     */
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);//對應fdataSync

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    if(serverStats != null) {
                        serverStats.incrementFsyncThresholdExceedCount();
                    }
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();
        }
    }

代碼很是的簡單。若是logStream還有,那就先刷下去。而後遍歷待flush的隊列(是個鏈表,用來保持操做順序),同時還會關注寫入的時間,若是過長,則會打一個Warn的日誌。

DataTree和DataNode

DataTree是Zk的內存數據結構——就是咱們以前說到的MTable。它以樹狀結構來組織DataNode。

這麼聽起來可能有點雲裏霧裏,不妨直接看一下DataNode的相關代碼。

public class DataNode implements Record {
    /** the data for this datanode */
    byte data[];

    /**
     * the acl map long for this datanode. the datatree has the map
     */
    Long acl;

    /**
     * the stat for this node that is persisted to disk.
     */
    public StatPersisted stat;

    /**
     * the list of children for this node. note that the list of children string
     * does not contain the parent path -- just the last part of the path. This
     * should be synchronized on except deserializing (for speed up issues).
     */
    private Set<String> children = null;
.....
}

若是用過ZkClient的小夥伴,可能很是熟悉。這就是咱們根據一個path獲取數據時返回的相關屬性——這就是用來描述存儲數據的一個類。注意,DataNode還會維護它的Children。

簡單瞭解DataNode後,咱們來看一下DataTree。爲了不干擾,咱們選出最關鍵的成員變量:

public class DataTree {
    private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

    /**
     * This hashtable provides a fast lookup to the datanodes. The tree is the
     * source of truth and is where all the locking occurs
     */
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();

    /**
     * This hashtable lists the paths of the ephemeral nodes of a session.
     */
    private final Map<Long, HashSet<String>> ephemerals =
        new ConcurrentHashMap<Long, HashSet<String>>();
    .......
}

咱們能夠看到,DataTree本質上是經過一個ConcurrentHashMap來存儲DataNode的(臨時節點也是)。保存的是 DataNode 的 path 到 DataNode 的映射。

那爲何要保存兩個狀態呢?這得看調用它們被調用的場景:

  • 通常CRUD ZNode的請求都是走ConcurrentHashMap的
  • 序列化DataTree的時候會從Root節點開始遍歷全部節點

若是須要獲取全部節點的信息,顯然遍歷樹會比一個個從ConcurrentHashMap 拿快。

接下來看一下序列化的相關代碼:

DataNode的序列化方法

/**
     * this method uses a stringbuilder to create a new path for children. This
     * is faster than string appends ( str1 + str2).
     *
     * @param oa
     *            OutputArchive to write to.
     * @param path
     *            a string builder.
     * @throws IOException
     * @throws InterruptedException
     */
    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
        String pathString = path.toString();
        DataNode node = getNode(pathString);
        if (node == null) {
            return;
        }
        String children[] = null;
        DataNode nodeCopy;
        synchronized (node) {
            StatPersisted statCopy = new StatPersisted();
            copyStatPersisted(node.stat, statCopy);
            //we do not need to make a copy of node.data because the contents
            //are never changed
            nodeCopy = new DataNode(node.data, node.acl, statCopy);
            Set<String> childs = node.getChildren();
            children = childs.toArray(new String[childs.size()]);
        }
        serializeNodeData(oa, pathString, nodeCopy);
        path.append('/');
        int off = path.length();
        for (String child : children) {
            // since this is single buffer being resused
            // we need
            // to truncate the previous bytes of string.
            path.delete(off, Integer.MAX_VALUE);
            path.append(child);
            serializeNode(oa, path);
        }
    }

能夠看到,的確是經過DataNode的Children來遍歷全部節點。

DataNode的反序列化方法

接下來看一下反序列化的代碼:

public void deserialize(InputArchive ia, String tag) throws IOException {
        aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        String path = ia.readString("path");
        while (!"/".equals(path)) {
            DataNode node = new DataNode();
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {
                aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {
                root = node;
            } else {
                String parentPath = path.substring(0, lastSlash);
                DataNode parent = nodes.get(parentPath);
                if (parent == null) {
                    throw new IOException("Invalid Datatree, unable to find " +
                            "parent " + parentPath + " of path " + path);
                }
                parent.addChild(path.substring(lastSlash + 1));
                long eowner = node.stat.getEphemeralOwner();
                EphemeralType ephemeralType = EphemeralType.get(eowner);
                if (ephemeralType == EphemeralType.CONTAINER) {
                    containers.add(path);
                } else if (ephemeralType == EphemeralType.TTL) {
                    ttls.add(path);
                } else if (eowner != 0) {
                    HashSet<String> list = ephemerals.get(eowner);
                    if (list == null) {
                        list = new HashSet<String>();
                        ephemerals.put(eowner, list);
                    }
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        nodes.put("/", root);
        // we are done with deserializing the
        // the datatree
        // update the quotas - create path trie
        // and also update the stat nodes
        setupQuota();

        aclCache.purgeUnused();
    }

由於序列化的時候是前序遍歷。因此反序列化時是先反序列化父親節點,再反序列化孩子節點。

Snapshot

那麼DataTree在什麼狀況下會序列化呢?在這裏就要提到快照了。

前面提到過:若是咱們使用一個內存數據結構加 WAL 的存儲方案,WAL 就會一直增加。這樣在存儲系統啓動的時候,就要讀取大量的 WAL 日誌數據來重建內存數據。快照能夠解決這個問題。

除了減小WAL日誌,Snapshot還會在Zk全量同步時被用到——當一個全新的ZkServer(這個通常叫Learner)被加入集羣時,Leader服務器會將本機上的數據全量同步給新來的ZkServer。

序列化

接下來看一下代碼入口:

/**
     * serialize the datatree and session into the file snapshot
     * @param dt the datatree to be serialized
     * @param sessions the sessions to be serialized
     * @param snapShot the file to store snapshot into
     */
    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
            throws IOException {
        if (!close) {
            try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
                 CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
                //CheckedOutputStream cout = new CheckedOutputStream()
                OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                serialize(dt, sessions, oa, header);
                long val = crcOut.getChecksum().getValue();
                oa.writeLong(val, "val");
                oa.writeString("/", "path");
                sessOS.flush();
            }
        } else {
            throw new IOException("FileSnap has already been closed");
        }
    }

JavaIO的基礎知識在這再也不介紹,有興趣的人能夠自行查閱資料或看 從一段代碼談起——淺談JavaIO接口

本質就是建立文件,並調用DataTree的序列化方法,DataTree的序列化其實就是遍歷DataNode去序列化,最後將這些序列化的內容寫入文件。

反序列化

/**
     * deserialize a data tree from the most recent snapshot
     * @return the zxid of the snapshot
     */
    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            snap = snapList.get(i);
            LOG.info("Reading snapshot " + snap);
            try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch (IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            }
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

簡單來講,先讀取Snapshot文件們。並反序列化它們,組成DataTree。

小結

在本文中,筆者和你們一塊兒學習了Zk的底層存儲技術。另外提一下,Zk中序列化技術用的是Apache Jute——本質上調用了JavaDataOutput和Input,較爲簡單。故沒在本文中展開。

相關文章
相關標籤/搜索