【Zookeeper】源碼分析之持久化(一)之FileTxnLog

1、前言算法

  前一篇已經分析了序列化,這篇接着分析Zookeeper的持久化過程源碼,持久化對於數據的存儲相當重要,下面進行詳細分析。數據庫

2、持久化整體框架apache

  持久化的類主要在包org.apache.zookeeper.server.persistence下,這次也主要是對其下的類進行分析,其包下整體的類結構以下圖所示。數組

  

  · TxnLog,接口類型,讀取事務性日誌的接口。session

  · FileTxnLog,實現TxnLog接口,添加了訪問該事務性日誌的API。app

  · Snapshot,接口類型,持久層快照接口。框架

  · FileSnap,實現Snapshot接口,負責存儲、序列化、反序列化、訪問快照。less

  · FileTxnSnapLog,封裝了TxnLog和SnapShot。dom

  · Util,工具類,提供持久化所需的API。ide

  下面先來分析TxnLog和FileTxnLog的源碼。

3、TxnLog源碼分析

  TxnLog是接口,規定了對日誌的響應操做。  

public interface TxnLog {
    
    /**
     * roll the current
     * log being appended to
     * @throws IOException 
     */
    // 回滾日誌
    void rollLog() throws IOException;
    /**
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * returns true iff something appended, otw false 
     * @throws IOException
     */
    // 添加一個請求至事務性日誌
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the 
     * next transaction in the logs.
     * @throws IOException
     */
    // 讀取事務性日誌
    TxnIterator read(long zxid) throws IOException;
    
    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    // 事務性操做的最新zxid
    long getLastLoggedZxid() throws IOException;
    
    /**
     * truncate the log to get in sync with the 
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException 
     */
    // 清空日誌,與Leader保持同步
    boolean truncate(long zxid) throws IOException;
    
    /**
     * the dbid for this transaction log. 
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    // 獲取數據庫的id
    long getDbId() throws IOException;
    
    /**
     * commmit the trasaction and make sure
     * they are persisted
     * @throws IOException
     */
    // 提交事務並進行確認
    void commit() throws IOException;
   
    /** 
     * close the transactions logs
     */
    // 關閉事務性日誌
    void close() throws IOException;
    /**
     * an iterating interface for reading 
     * transaction logs. 
     */
    // 讀取事務日誌的迭代器接口
    public interface TxnIterator {
        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        // 獲取事務頭部
        TxnHeader getHeader();
        
        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        // 獲取事務
        Record getTxn();
     
        /**
         * go to the next transaction record.
         * @throws IOException
         */
        // 下個事務
        boolean next() throws IOException;
        
        /**
         * close files and release the 
         * resources
         * @throws IOException
         */
        // 關閉文件釋放資源
        void close() throws IOException;
    }
}

  其中,TxnLog除了提供讀寫事務日誌的API外,還提供了一個用於讀取日誌的迭代器接口TxnIterator。

4、FileTxnLog源碼分析

  對於LogFile而言,其格式可分爲以下三部分

  LogFile:

    FileHeader TxnList ZeroPad

  FileHeader格式以下  

  FileHeader: {
    magic 4bytes (ZKLG)
    version 4bytes
    dbid 8bytes
  }

  TxnList格式以下

  TxnList:

    Txn || Txn TxnList

  Txn格式以下

  Txn:

    checksum Txnlen TxnHeader Record 0x42

  Txnlen格式以下

  Txnlen:
    len 4bytes

  TxnHeader格式以下

  TxnHeader: {

    sessionid 8bytes
    cxid 4bytes
      zxid 8bytes
    time 8bytes
    type 4bytes
  }

  ZeroPad格式以下

  ZeroPad:
    0 padded to EOF (filled during preallocation stage)

  瞭解LogFile的格式對於理解源碼會有很大的幫助。

  4.1 屬性  

public class FileTxnLog implements TxnLog {
    private static final Logger LOG;
    
    // 預分配大小 64M
    static long preAllocSize =  65536 * 1024;
    
    // 魔術數字,默認爲1514884167
    public final static int TXNLOG_MAGIC =
        ByteBuffer.wrap("ZKLG".getBytes()).getInt();

    // 版本號
    public final static int VERSION = 2;

    /** Maximum time we allow for elapsed fsync before WARNing */
    // 進行同步時,發出warn以前所能等待的最長時間
    private final static long fsyncWarningThresholdMS;

    // 靜態屬性,肯定Logger、預分配空間大小和最長時間
    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);

        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
        fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
    }
    
    // 最大(新)的zxid
    long lastZxidSeen;
    // 存儲數據相關的流
    volatile BufferedOutputStream logStream = null;
    volatile OutputArchive oa;
    volatile FileOutputStream fos = null;

    // log目錄文件
    File logDir;
    
    // 是否強制同步
    private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
    
    // 數據庫id
    long dbId;
    
    // 流列表
    private LinkedList<FileOutputStream> streamsToFlush =
        new LinkedList<FileOutputStream>();
    
    // 當前大小
    long currentSize;
    // 寫日誌文件
    File logFileWrite = null;
}

  4.2. 核心函數 

  1. append函數

    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr != null) { // 事務頭部不爲空
            if (hdr.getZxid() <= lastZxidSeen) { // 事務的zxid小於等於最後的zxid
                LOG.warn("Current zxid " + hdr.getZxid()
                        + " is <= " + lastZxidSeen + " for "
                        + hdr.getType());
            }
            if (logStream==null) { // 日誌流爲空
               if(LOG.isInfoEnabled()){
                    LOG.info("Creating new log file: log." +  
                            Long.toHexString(hdr.getZxid()));
               }
               
               // 
               logFileWrite = new File(logDir, ("log." + 
                       Long.toHexString(hdr.getZxid())));
               fos = new FileOutputStream(logFileWrite);
               logStream=new BufferedOutputStream(fos);
               oa = BinaryOutputArchive.getArchive(logStream);
               // 
               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
               // 序列化
               fhdr.serialize(oa, "fileheader");
               // Make sure that the magic number is written before padding.
               // 刷新到磁盤
               logStream.flush();
               
               // 當前通道的大小
               currentSize = fos.getChannel().position();
               // 添加fos
               streamsToFlush.add(fos);
            }
            
            // 填充文件
            padFile(fos);
            
            // Serializes transaction header and transaction data into a byte buffer.
            // 將事務頭和事務數據序列化成Byte Buffer
            byte[] buf = Util.marshallTxnEntry(hdr, txn);
            if (buf == null || buf.length == 0) { // 爲空,拋出異常
                throw new IOException("Faulty serialization for header " +
                        "and txn");
            }
            // 生成一個驗證算法
            Checksum crc = makeChecksumAlgorithm();
            // Updates the current checksum with the specified array of bytes
            // 使用Byte數組來更新當前的Checksum
            crc.update(buf, 0, buf.length);
            // 寫long類型數據
            oa.writeLong(crc.getValue(), "txnEntryCRC");
            // Write the serialized transaction record to the output archive.
            // 將序列化的事務記錄寫入OutputArchive
            Util.writeTxnBytes(oa, buf);
            
            return true;
        }
        return false;
    }
View Code

  說明:append函數主要用作向事務日誌中添加一個條目,其大致步驟以下

  ① 檢查TxnHeader是否爲空,若不爲空,則進入②,不然,直接返回false

  ② 檢查logStream是否爲空(初始化爲空),若不爲空,則進入③,不然,進入⑤

  ③ 初始化寫數據相關的流和FileHeader,並序列化FileHeader至指定文件,進入④

  ④ 強制刷新(保證數據存到磁盤),並獲取當前寫入數據的大小。進入⑤

  ⑤ 填充數據,填充0,進入⑥

  ⑥ 將事務頭和事務序列化成ByteBuffer(使用Util.marshallTxnEntry函數),進入⑦

  ⑦ 使用Checksum算法更新步驟⑥的ByteBuffer。進入⑧

  ⑧ 將更新的ByteBuffer寫入磁盤文件,返回true

  append間接調用了padLog函數,其源碼以下 

    public static long padLogFile(FileOutputStream f,long currentSize,
            long preAllocSize) throws IOException{
        // 獲取位置
        long position = f.getChannel().position();
        if (position + 4096 >= currentSize) { // 計算後是否大於當前大小
            // 從新設置當前大小,剩餘部分填充0
            currentSize = currentSize + preAllocSize;
            fill.position(0);
            f.getChannel().write(fill, currentSize-fill.remaining());
        }
        return currentSize;
    }
View Code

  說明:其主要做用是當文件大小不滿64MB時,向文件填充0以達到64MB大小。

  2. getLogFiles函數  

    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        // 按照zxid對文件進行排序
        List<File> files = Util.sortDataDir(logDirList, "log", true);
        long logZxid = 0;
        // Find the log file that starts before or at the same time as the
        // zxid of the snapshot
        for (File f : files) { // 遍歷文件
            // 從文件中獲取zxid
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid > snapshotZxid) { // 跳過大於snapshotZxid的文件
                continue;
            }
            // the files
            // are sorted with zxid's
            if (fzxid > logZxid) { // 找出文件中最大的zxid(同時還須要小於等於snapshotZxid)
                logZxid = fzxid;
            }
        }
        // 文件列表
        List<File> v=new ArrayList<File>(5);
        for (File f : files) { // 再次遍歷文件
            // 從文件中獲取zxid
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid < logZxid) { // 跳太小於logZxid的文件
                continue;
            }
            // 添加
            v.add(f);
        }
        // 轉化成File[] 類型後返回
        return v.toArray(new File[0]);

    }
View Code

  說明:該函數的做用是找出剛剛小於或者等於snapshot的全部log文件。其步驟大體以下。

  ① 對全部log文件按照zxid進行升序排序,進入②

  ② 遍歷全部log文件並記錄剛剛小於或等於給定snapshotZxid的log文件的logZxid,進入③

    ③ 再次遍歷log文件,添加zxid大於等於步驟②中的logZxid的全部log文件,進入④

  ④ 轉化後返回

  getLogFiles函數調用了sortDataDir,其源碼以下: 

public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
    {
        if(files==null) 
            return new ArrayList<File>(0);
        // 轉化爲列表
        List<File> filelist = Arrays.asList(files);
        // 進行排序,Comparator是關鍵,根據zxid進行排序
        Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
        return filelist;
    }
View Code

  說明:其用於排序log文件,能夠選擇根據zxid進行升序或降序。

  getLogFiles函數間接調用了getZxidFromName,其源碼以下: 

    // 從文件名中解析出zxid
    public static long getZxidFromName(String name, String prefix) {
        long zxid = -1;
        // 對文件名進行分割
        String nameParts[] = name.split("\\.");
        if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前綴相同
            try {
                // 轉化成長整形
                zxid = Long.parseLong(nameParts[1], 16);
            } catch (NumberFormatException e) {
            }
        }
        return zxid;
    }
View Code

  說明:getZxidFromName主要用做從文件名中解析zxid,而且須要從指定的前綴開始。

  3. getLastLoggedZxid函數 

    public long getLastLoggedZxid() {
        // 獲取已排好序的全部的log文件
        File[] files = getLogFiles(logDir.listFiles(), 0);
        // 獲取最大的zxid(最後一個log文件對應的zxid)
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

        // if a log file is more recent we must scan it to find
        // the highest zxid
        // 
        long zxid = maxLog;
        // 迭代器
        TxnIterator itr = null;
        try {
            // 新生FileTxnLog
            FileTxnLog txn = new FileTxnLog(logDir);
            // 開始讀取從給定zxid以後的全部事務
            itr = txn.read(maxLog);
            while (true) { // 遍歷
                if(!itr.next()) // 是否存在下一項
                    break;
                // 獲取事務頭
                TxnHeader hdr = itr.getHeader();
                // 獲取zxid
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            // 關閉迭代器
            close(itr);
        }
        return zxid;
    }
View Code

  說明:該函數主要用於獲取記錄在log中的最後一個zxid。其步驟大體以下

  ① 獲取已排好序的全部log文件,並從最後一個文件中取出zxid做爲候選的最大zxid,進入②

  ② 新生成FileTxnLog並讀取步驟①中zxid以後的全部事務,進入③

  ③ 遍歷全部事務並提取出相應的zxid,最後返回。

  其中getLastLoggedZxid調用了read函數,其源碼以下 

public TxnIterator read(long zxid) throws IOException {
        // 返回事務文件訪問迭代器
        return new FileTxnIterator(logDir, zxid);
    }
View Code

  說明:read函數會生成一個FileTxnIterator,其是TxnLog.TxnIterator的子類,以後在FileTxnIterator構造函數中會調用init函數,其源碼以下 

    void init() throws IOException {
        // 新生成文件列表
        storedFiles = new ArrayList<File>();
        // 進行排序
        List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
        for (File f: files) { // 遍歷文件
            if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大於等於指定zxid的文件
                storedFiles.add(f);
            }
            // add the last logfile that is less than the zxid
            else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一個zxid小於指定zxid的文件,而後退出
                storedFiles.add(f);
                break;
            }
        }
        // go to the next logfile
        // 進入下一個log文件
        goToNextLog();
        if (!next()) // 不存在下一項,返回
            return;
        while (hdr.getZxid() < zxid) { // 從事務頭中獲取zxid小於給定zxid,直到不存在下一項或者大於給定zxid時退出
            if (!next())
                return;
        }
    }
View Code

  說明:init函數用於進行初始化操做,會根據zxid的不一樣進行不一樣的初始化操做,在init函數中會調用goToNextLog函數,其源碼以下  

    private boolean goToNextLog() throws IOException {
        if (storedFiles.size() > 0) { // 存儲的文件列表大於0
            // 取最後一個log文件
            this.logFile = storedFiles.remove(storedFiles.size()-1);
            // 針對該文件,建立InputArchive
            ia = createInputArchive(this.logFile);
            // 返回true
            return true;
        }
        return false;
    }
View Code

  說明:goToNextLog表示選取下一個log文件,在init函數中還調用了next函數,其源碼以下  

    public boolean next() throws IOException {
        if (ia == null) { // 爲空,返回false
            return false;
        }
        try {
            // 讀取長整形crcValue
            long crcValue = ia.readLong("crcvalue");
            // 經過input archive讀取一個事務條目
            byte[] bytes = Util.readTxnBytes(ia);
            // Since we preallocate, we define EOF to be an
            if (bytes == null || bytes.length==0) { // 對bytes進行判斷
                throw new EOFException("Failed to read " + logFile);
            }
            // EOF or corrupted record
            // validate CRC
            // 驗證CRC
            Checksum crc = makeChecksumAlgorithm();
            // 更新
            crc.update(bytes, 0, bytes.length);
            if (crcValue != crc.getValue()) // 驗證不相等,拋出異常
                throw new IOException(CRC_ERROR);
            if (bytes == null || bytes.length == 0) // bytes爲空,返回false
                return false;
            // 新生成TxnHeader
            hdr = new TxnHeader();
            // 將Txn反序列化,而且將對應的TxnHeader反序列化至hdr,整個Record反序列化至record
            record = SerializeUtils.deserializeTxn(bytes, hdr);
        } catch (EOFException e) { // 拋出異常
            LOG.debug("EOF excepton " + e);
            // 關閉輸入流
            inputStream.close();
            // 賦值爲null
            inputStream = null;
            ia = null;
            hdr = null;
            // this means that the file has ended
            // we should go to the next file
            if (!goToNextLog()) { // 沒有log文件,則返回false
                return false;
            }
            // if we went to the next log file, we should call next() again
            // 繼續調用next
            return next();
        } catch (IOException e) {
            inputStream.close();
            throw e;
        }
        // 返回true
        return true;
    }
View Code

  說明:next表示將迭代器移動至下一個事務,方便讀取,next函數的步驟以下。

  ① 讀取事務的crcValue值,用於後續的驗證,進入②

  ② 讀取事務,使用CRC32進行更新並與①中的結果進行比對,若不相同,則拋出異常,不然,進入③

  ③ 將事務進行反序列化並保存至相應的屬性中(如事務頭和事務體),會肯定具體的事務操做類型。

  ④ 在讀取過程拋出異常時,會首先關閉流,而後再嘗試調用next函數(即進入下一個事務進行讀取)。

  4. commit函數  

    public synchronized void commit() throws IOException {
        if (logStream != null) {
            // 強制刷到磁盤
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) { // 遍歷流
            // 強制刷到磁盤
            log.flush();
            if (forceSync) { // 是否強制同步
                long startSyncNS = System.nanoTime();
                
                log.getChannel().force(false);
                // 計算流式的時間
                long syncElapsedMS =
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) { // 大於閾值時則會警告
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) { // 移除流並關閉
            streamsToFlush.removeFirst().close();
        }
    }
View Code

  說明:該函數主要用於提交事務日誌至磁盤,其大體步驟以下

  ① 若日誌流logStream不爲空,則強制刷新至磁盤,進入②

  ② 遍歷須要刷新至磁盤的全部流streamsToFlush並進行刷新,進入③

  ③ 判斷是否須要強制性同步,如是,則計算每一個流的流式時間並在控制檯給出警告,進入④

  ④ 移除全部流並關閉。

  5. truncate函數 

    public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            // 獲取迭代器
            itr = new FileTxnIterator(this.logDir, zxid);
            PositionInputStream input = itr.inputStream;
            long pos = input.getPosition();
            // now, truncate at the current position
            // 從當前位置開始清空
            RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
            raf.setLength(pos);
            raf.close();
            while (itr.goToNextLog()) { // 存在下一個log文件
                if (!itr.logFile.delete()) { // 刪除
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            // 關閉迭代器
            close(itr);
        }
        return true;
    }
View Code

  說明:該函數用於清空大於給定zxid的全部事務日誌。

5、總結

  對於持久化中的TxnLog和FileTxnLog的源碼分析就已經完成了,其源碼仍是相對簡單,也謝謝各位園友的觀看~ 

相關文章
相關標籤/搜索