zookeeper-數據初始化分析

概述

在前兩篇關於 zookeeper 的分析文章中,咱們熟悉了其選舉投票及集羣間數據同步的過程,本文將針對 zookeeper 啓動前的一個重要處理過程進行分析;也便是數據初始化java

經過調用追蹤咱們發現 zookeeper 最終會執行 ZKDatabase 的方法 loadDataBase 來完成數據初始化數據庫

public long loadDataBase() throws IOException {
    PlayBackListener listener=new PlayBackListener(){
        public void onTxnLoaded(TxnHeader hdr,Record txn){
            Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
                    null, null);
            r.txn = txn;
            r.hdr = hdr;
            r.zxid = hdr.getZxid();
            // 將事務日誌包裝成 proposal 集羣模式下會同步給follower
            addCommittedProposal(r);
        }
    };
    
    long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
    initialized = true;
    return zxid;
}
複製代碼
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    // 快照文件加載至內存
    snapLog.deserialize(dt, sessions);
    
    // ......
}
複製代碼

下面咱們將針對整個過程進行分析:數組

加載快照文件

FileSnap 經過反序列化快照文件,將數據寫入內存數據庫 DataTreebash

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
    // 查找有效的快照文件
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    
    // 省略
}
複製代碼

deserialize 實現可知, zookeeper 在處理快照文件時首先須要查找指定個數的有效快照文件; 若不存在快照文件則返回 -1 。session

查找有效的快照文件

private List<File> findNValidSnapshots(int n) throws IOException {
    // 查找 snapDir 目錄下 snapshot 爲前綴的快照文件;並按照 zxid 大小降序排列
    List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
    int count = 0;
    List<File> list = new ArrayList<File>();
    for (File f : files) {
        try {
            // 判斷當前快照文件是否爲有效文件
            if (Util.isValidSnapshot(f)) {
                list.add(f);
                count++;
                // 當有效快照文件個數 == n 時退出循環
                if (count == n) {
                    break;
                }
            }
        } catch (IOException e) {
            LOG.info("invalid snapshot " + f, e);
        }
    }
    return list;
}
複製代碼
public static boolean isValidSnapshot(File f) throws IOException {
    if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1)
        return false;

    // Check for a valid snapshot
    // 隨機讀文件
    RandomAccessFile raf = new RandomAccessFile(f, "r");
    try {
        // including the header and the last / bytes
        // the snapshot should be atleast 10 bytes
    	// 文件內容長度至少爲 10 字節
        if (raf.length() < 10) {
            return false;
        }
        // 指針移動到文件末尾 5 個字節處
        raf.seek(raf.length() - 5);
        byte bytes[] = new byte[5];
        int readlen = 0;
        int l;
        // 讀最後 5 字節至 bytes 字節數組中
        while(readlen < 5 &&
              (l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) {
            readlen += l;
        }
        if (readlen != bytes.length) {
            LOG.info("Invalid snapshot " + f
                    + " too short, len = " + readlen);
            return false;
        }
        ByteBuffer bb = ByteBuffer.wrap(bytes);
        int len = bb.getInt();
        byte b = bb.get();
        // 前 4 字節爲整數 1,最後一個字節爲 / 則爲有效快照文件
        if (len != 1 || b != '/') {
            LOG.info("Invalid snapshot " + f + " len = " + len
                    + " byte = " + (b & 0xff));
            return false;
        }
    } finally {
        raf.close();
    }

    return true;
}
複製代碼

從上述代碼實現可知,zookeepersnap file 具備如下特性:less

  • 快照文件內容長度至少爲 10 字節
  • 快照文件末尾 5 個字節內容爲: 整形數字 1 + 字符 /
能夠在後續生成快照文件時進行驗證
複製代碼

解析快照文件

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
    // 查找有效的快照文件
    List<File> snapList = findNValidSnapshots(100);
    if (snapList.size() == 0) {
        return -1L;
    }
    File snap = null;
    boolean foundValid = false;
    for (int i = 0; i < snapList.size(); i++) {
        snap = snapList.get(i);
        InputStream snapIS = null;
        CheckedInputStream crcIn = null;
        try {
            LOG.info("Reading snapshot " + snap);
            snapIS = new BufferedInputStream(new FileInputStream(snap));
            crcIn = new CheckedInputStream(snapIS, new Adler32());
            // 反序列化文件
            InputArchive ia = BinaryInputArchive.getArchive(crcIn);
            // 將文件內容寫入 dataTree
            deserialize(dt,sessions, ia);
            // 文件校驗和判斷
            // 若 checkSum 不一致則繼續解析下一個快照文件
            long checkSum = crcIn.getChecksum().getValue();
            long val = ia.readLong("val");
            if (val != checkSum) {
                throw new IOException("CRC corruption in snapshot : " + snap);
            }

            // 實際上當最近的一個快照文件爲有效文件的時候就會退出循環
            foundValid = true;
            break;
        } catch(IOException e) {
            LOG.warn("problem reading snap file " + snap, e);
        } finally {
            if (snapIS != null) 
                snapIS.close();
            if (crcIn != null) 
                crcIn.close();
        } 
    }

    // 沒有一個有效的快照文件,則啓動失敗
    if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
    }
    // 記錄最近的 zxid
    dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
    return dt.lastProcessedZxid;
}
複製代碼

zookeeper 經過遍歷 snap file 並反序列化解析 snap file 將其寫入內存數據庫 dataTree, 在該過程當中會對 check sum 進行檢查以肯定 snap file 的正確性(通常來說當 snap file 正確性經過後,只會解析最新的 snap file);若沒有一個正確的 snap file 則拋出異常說明啓動失敗,反之 dataTree 將會記錄 lastProcessedZxiddom

加載事務日誌文件

下面咱們繼續對 FileTxSnapLogrestore 方法進行分析,zookeeper 在完成快照文件的處理以後,會加載事務日誌文件並處理this

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    // 快照文件加載至內存
    snapLog.deserialize(dt, sessions);

    // 構造 FileTxnLog 實例
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    // 經過logDir zxid 構造 FileTxnIterator 實例並執行初始化動做
    // 加載比快照數據新的日誌文件並查找最接近快照的事務日誌記錄
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    // 省略
}
複製代碼

在執行spa

TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
複製代碼

時會構建 FileTxnIterator 實例並執行 init 動做,代碼以下:指針

public FileTxnIterator(File logDir, long zxid) throws IOException {
  this.logDir = logDir;
  this.zxid = zxid;
  init();
}
複製代碼

接下來咱們看下其 init 中如何處理事務日誌文件

void init() throws IOException {
    storedFiles = new ArrayList<File>();
    // 查找 logDir 目錄下的全部事務日誌記錄文件,並按 zxid 降序排列
    List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);

    // storedFiles 用來記錄 zxid 大於快照文件 zxid 的事務日誌文件
    for (File f: files) {
        if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
            storedFiles.add(f);
        }
        // add the last logfile that is less than the zxid
        else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
            storedFiles.add(f);
            break;
        }
    }
    // 由於一個事務日誌文件中記錄了多條事務日誌,而事務日誌文件名的後綴是當前文件的第一條事務記錄的zxid
    // 經過判斷 hdr.getZxid < zxid 查找最接近快照的事務記錄
    goToNextLog();
    if (!next())
        return;
    while (hdr.getZxid() < zxid) {
        if (!next())
            return;
    }
}
複製代碼

init 的實現能夠看出 zookeeper 對事物日誌文件的處理流程以下:

  • 首先查找 logDir 目錄下全部的事務日誌文件 (log 爲前綴) 並按 zxid 降序
  • 將事務日誌文件的後綴 zxid 與快照文件 zxid 進行比較,查找並記錄最接近快照的事務日誌文件
  • 讀取事務日誌文件,經過 hdr.getZxid() < zxid 查找最接近快照的事務日誌記錄

處理快照以後的事務

上文中經過加載事務日誌文件查找到快照以後所提交的事務記錄,下面就看下如何處理這些事務記錄的

public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
    // 快照文件加載至內存
    snapLog.deserialize(dt, sessions);
    FileTxnLog txnLog = new FileTxnLog(dataDir);
    // 加載比快照數據新的日誌文件
    TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
    long highestZxid = dt.lastProcessedZxid;
    TxnHeader hdr;
    try {
        while (true) {
            // iterator points to 
            // the first valid txn when initialized
            hdr = itr.getHeader();
            if (hdr == null) {
                //empty logs 
                return dt.lastProcessedZxid;
            }
            if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error("{}(higestZxid) > {}(next log) for type {}",
                        new Object[] { highestZxid, hdr.getZxid(),
                                hdr.getType() });
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                // 處理事務日誌 寫入內存數據庫
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
               throw new IOException("Failed to process transaction type: " +
                     hdr.getType() + " error: " + e.getMessage(), e);
            }
            // listener 回調將事務包裝爲 proposal 集羣模式下同步至 follower
            listener.onTxnLoaded(hdr, itr.getTxn());
            // 繼續下一條事務日誌
            if (!itr.next()) 
                break;
        }
    } finally {
        if (itr != null) {
            itr.close();
        }
    }
    return highestZxid;
}
複製代碼

restore 的後續實現咱們能夠看出, zookeeper 在完成事務日誌文件加載以後,會依次處理日誌文件中每條事務記錄:

  • 按事務類型處理事務記錄,寫入內存數據庫
  • listener 回調事務加載事件;將事務包裝爲 proposal 存儲到 committedLog 列表中,並分別記錄 maxCommittedLogminCommittedLog (參見上文中的PlayBackListener

小結

經過上文的分析,能夠歸納下 zookeeper 數據初始化的流程以下:

  • 加載快照文件,獲取快照最新的 zxid
  • 加載事務日誌文件,獲取快照以後的事務記錄
  • 處理快照以後的事務記錄,寫入內存
相關文章
相關標籤/搜索