zk源碼閱讀3: 持久化FileTxnLog

摘要

本節講解html

zk的持久化框架
事務日誌FileTxnLog日誌結構
FileTxnLog源碼
LogFormatter完成事務日誌的反序列化
分析事務日誌demo

持久化整體框架

持久化的類主要在包org.apache.zookeeper.server.persistence下,結構以下圖java

zk持久化框架

TxnLog,接口類型,讀取事務性日誌的接口。
FileTxnLog,實現TxnLog接口,添加了訪問該事務性日誌的API。
Snapshot,接口類型,持久層快照接口。
FileSnap,實現Snapshot接口,負責存儲、序列化、反序列化、訪問快照。
FileTxnSnapLog,封裝了TxnLog和SnapShot。
Util,工具類,提供持久化所需的API。

兩種日誌

zk主要存放了兩類文件算法

snapshot(內存快照)
log(事務日誌,相似MySQL的binlog,將全部與修改數據相關的操做記錄在log中)

關於事務性日誌的定義,能夠參照refer,簡而言之就是 zk事務日誌文件用來記錄事物操做,每個事務操做如添加,刪除節點等等,都會在事務日誌中記錄一條記錄,用來在zookeeper異常狀況下恢復數據數據庫

下面介紹事務日誌apache

事務日誌

正常運行過程當中,針對全部更新操做,在返回客戶端「更新成功」的響應前,ZK會確保已經將本次更新操做的事務日誌寫到磁盤上,只有這樣,整個更新操做纔會生效。session

接口TxnLog

public interface TxnLog {
    
    /**
     * roll the current
     * log being appended to
     * @throws IOException 
     */
    // 滾動日誌,從當前日誌滾到下一個日誌,不是回滾
    void rollLog() throws IOException;
    /**
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * returns true iff something appended, otw false 
     * @throws IOException
     */
    // 添加一個請求至事務性日誌
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the 
     * next transaction in the logs.
     * @throws IOException
     */
    // 讀取事務性日誌
    TxnIterator read(long zxid) throws IOException;
    
    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    // 事務性操做的最新zxid
    long getLastLoggedZxid() throws IOException;
    
    /**
     * truncate the log to get in sync with the 
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException 
     */
    // 清空zxid之後的日誌
    boolean truncate(long zxid) throws IOException;
    
    /**
     * the dbid for this transaction log. 
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    // 獲取數據庫的id
    long getDbId() throws IOException;
    
    /**
     * commmit the trasaction and make sure
     * they are persisted
     * @throws IOException
     */
    // 提交事務並進行確認
    void commit() throws IOException;
   
    /** 
     * close the transactions logs
     */
    // 關閉事務性日誌
    void close() throws IOException;
    /**
     * an iterating interface for reading 
     * transaction logs. 
     */
    // 讀取事務日誌的迭代器接口
    public interface TxnIterator {
        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        // 獲取事務頭部
        TxnHeader getHeader();
        
        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        // 獲取事務
        Record getTxn();
     
        /**
         * go to the next transaction record.
         * @throws IOException
         */
        // 下個事務
        boolean next() throws IOException;
        
        /**
         * close files and release the 
         * resources
         * @throws IOException
         */
        // 關閉文件釋放資源
        void close() throws IOException;
    }
}

實現類 FileTxnLog

文件結構

/**
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 * 
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 * 
 * TxnList:
 *     Txn || Txn TxnList
 *     
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 * 
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 * 
 * Txnlen:
 *     len 4bytes
 * 
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *     
 * Record:
 *     See Jute definition file for details on the various record types
 *      
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote> 
 */

主要接口

append
//添加一條事務性日誌
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr != null) { // 事務頭部不爲空
            if (hdr.getZxid() <= lastZxidSeen) {
                LOG.warn("Current zxid " + hdr.getZxid()
                        + " is <= " + lastZxidSeen + " for "
                        + hdr.getType());
            }
            if (logStream==null) { //日誌流爲空
               if(LOG.isInfoEnabled()){
                    LOG.info("Creating new log file: log." +  
                            Long.toHexString(hdr.getZxid()));
               }
               //生成一個新的log文件
               logFileWrite = new File(logDir, ("log." + 
                       Long.toHexString(hdr.getZxid())));
               fos = new FileOutputStream(logFileWrite);
               logStream=new BufferedOutputStream(fos);
               oa = BinaryOutputArchive.getArchive(logStream);
               //用TXNLOG_MAGIC VERSION dbId來生成文件頭
               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
               fhdr.serialize(oa, "fileheader");//序列化
               // Make sure that the magic number is written before padding.
               logStream.flush();
               currentSize = fos.getChannel().position();
               streamsToFlush.add(fos);
            }
            padFile(fos);//剩餘空間不夠4k時則填充文件64M
            byte[] buf = Util.marshallTxnEntry(hdr, txn);
            if (buf == null || buf.length == 0) {
                throw new IOException("Faulty serialization for header " +
                        "and txn");
            }
            Checksum crc = makeChecksumAlgorithm();//生成驗證算法
            crc.update(buf, 0, buf.length);
            oa.writeLong(crc.getValue(), "txnEntryCRC");//將驗證算法的值寫入long
            Util.writeTxnBytes(oa, buf);//將序列化事務記錄寫入OutputArchive,以0x42('B')結束
            return true;
        }
        return false;
    }

添加事務性日誌 )app

getLogFiles

//找出<=snapshot的中最大的zxid的logfile以及後續的logfile
    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        List<File> files = Util.sortDataDir(logDirList, "log", true);//按照後綴抽取zxid,按zxid升序排序
        long logZxid = 0;
        // Find the log file that starts before or at the same time as the
        // zxid of the snapshot
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid > snapshotZxid) {
                continue;
            }
            // the files
            // are sorted with zxid's
            if (fzxid > logZxid) {
                logZxid = fzxid;
            }
        }
        List<File> v=new ArrayList<File>(5);
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid < logZxid) {
                continue;
            }
            v.add(f);
        }
        return v.toArray(new File[0]);
    }

getLastLoggedZxid

//獲取記錄在log中的最後一個zxid
    public long getLastLoggedZxid() {
        File[] files = getLogFiles(logDir.listFiles(), 0);
        //找到最大的zxid所在的文件
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

        // if a log file is more recent we must scan it to find
        // the highest zxid
        long zxid = maxLog;
        TxnIterator itr = null;
        try {
            FileTxnLog txn = new FileTxnLog(logDir);
            itr = txn.read(maxLog);
            while (true) {
                if(!itr.next())
                    break;
                TxnHeader hdr = itr.getHeader();//遍歷這個文件,找到最後一條事務日誌記錄
                zxid = hdr.getZxid();//取出zxid
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            close(itr);
        }
        return zxid;
    }

commit

//提交事務日誌至磁盤
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();// 強制刷到磁盤
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();// 強制刷到磁盤
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                log.getChannel().force(false);

                long syncElapsedMS =
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();// 移除流並關閉
        }
    }

truncate

//清空大於指定zxid的事務日誌
    public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            itr = new FileTxnIterator(this.logDir, zxid);//根據zxid找到迭代器
            PositionInputStream input = itr.inputStream;
            if(input == null) {
                throw new IOException("No log files found to truncate! This could " +
                        "happen if you still have snapshots from an old setup or " +
                        "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
            }
            long pos = input.getPosition();
            // now, truncate at the current position
            RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
            raf.setLength(pos);//把當前log後面的部分(zxid更大的)截斷
            raf.close();
            while (itr.goToNextLog()) {
                if (!itr.logFile.delete()) {//把後面的log文件都刪除
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            close(itr);
        }
        return true;
    }

rollLog

這個必定要看註釋,意思不是回滾日誌,是從當前日誌滾到下一個框架

/**
     * rollover the current log file to a new one.
     * @throws IOException
     */
    public synchronized void rollLog() throws IOException {
        if (logStream != null) {
            this.logStream.flush();
            this.logStream = null;
            oa = null;
        }
    }

事務日誌可視化 LogFormatter

能夠結合org.apache.zookeeper.server.persistence.FileTxnLog#append進行理解 傳入參數爲對應的事務日誌路徑便可運維

public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("USAGE: LogFormatter log_file");
            System.exit(2);
        }
        FileInputStream fis = new FileInputStream(args[0]);
        BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
        FileHeader fhdr = new FileHeader();
        fhdr.deserialize(logStream, "fileheader");
        //反序列化header完成驗證
        if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
            System.err.println("Invalid magic number for " + args[0]);
            System.exit(2);
        }
        System.out.println("ZooKeeper Transactional Log File with dbid "
                + fhdr.getDbid() + " txnlog format version "
                + fhdr.getVersion());

        int count = 0;
        while (true) {
            long crcValue;
            byte[] bytes;
            try {
                crcValue = logStream.readLong("crcvalue");//獲取反序列化的checksum

                bytes = logStream.readBuffer("txnEntry");
            } catch (EOFException e) {
                System.out.println("EOF reached after " + count + " txns.");
                return;
            }
            if (bytes.length == 0) {
                // Since we preallocate, we define EOF to be an
                // empty transaction
                System.out.println("EOF reached after " + count + " txns.");
                return;
            }
            Checksum crc = new Adler32();
            crc.update(bytes, 0, bytes.length);
            if (crcValue != crc.getValue()) {//比較本身生成的checksum與傳遞過來的checksum
                throw new IOException("CRC doesn't match " + crcValue +
                        " vs " + crc.getValue());
            }
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(bytes, hdr);//反序列化事務
            System.out.println(DateFormat.getDateTimeInstance(DateFormat.SHORT,
                    DateFormat.LONG).format(new Date(hdr.getTime()))
                    + " session 0x"
                    + Long.toHexString(hdr.getClientId())
                    + " cxid 0x"
                    + Long.toHexString(hdr.getCxid())
                    + " zxid 0x"
                    + Long.toHexString(hdr.getZxid())
                    + " " + TraceFormatter.op2String(hdr.getType()) + " " + txn);
            if (logStream.readByte("EOR") != 'B') {
                LOG.error("Last transaction was partial.");
                throw new EOFException("Last transaction was partial.");
            }
            count++;
        }
    }

反序列化事務記錄

事務日誌可視化效果

針對http://www.jianshu.com/p/d1f8b9d6ad57貼出的demo 利用LogFormatter進行解析,事先把事務日誌目錄清空 輸出爲dom

ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x0 zxid 0x1 createSession 20000

17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x2 zxid 0x2 create '/test1,#7a6e6f646531,v{s{31,s{'world,'anyone}}},T,1

17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x3 zxid 0x3 create '/test2,#7a6e6f646532,v{s{31,s{'world,'anyone}}},T,2

17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x4 zxid 0x4 create '/test3,#7a6e6f646533,v{s{31,s{'world,'anyone}}},T,3

17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0x9 zxid 0x5 setData '/test2,#7a4e6f64653232,1

17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0xb zxid 0x6 delete '/test2

17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0xc zxid 0x7 delete '/test1

17-5-24 下午04時16分04秒 session 0x15c398687180000 cxid 0x0 zxid 0x8 closeSession null
EOF reached after 8 txns.

結合FileTxnLog#append很好理解

吐槽

tag不匹配

序列化時
org.apache.zookeeper.server.persistence.FileTxnLog#append裏面是
oa.writeLong(crc.getValue(), "txnEntryCRC");//將驗證算法的值寫入long
反序列化,解析的時候是
org.apache.zookeeper.server.LogFormatter#main
crcValue = logStream.readLong("crcvalue");
這倆tag都不同,雖然並不影響運行!!!

FileTxnLog#getLogFiles效率低

都已經按zxid升序排序了,一次循環就該搞定了

思考

文件後綴是按照zxid來生成的

logFileWrite = new File(logDir, ("log." +  Long.toHexString(hdr.getZxid())));
這對於定位文件,zxid都提供了一些便利
好比在getLastLoggedZxid中的調用

rollLog函數的意義

函數沒有參數 必定要注意,是從當前日誌,滾到下一個日誌(好比日誌量太大了之類的場景) 不是回滾日誌裏面的記錄,試想回滾怎麼能不告訴回滾的zxid呢

能夠比較一下,rollLog函數形成logstream爲null,append函數便會生成新的文件logFileWrite,新的流logStream

commit和rollLog兩個函數都調用了flush,區別是什麼

涉及到FileChannel,nio相關,

寫入FileChannel調用鏈以下 org.apache.zookeeper.server.persistence.FileTxnLog#append org.apache.zookeeper.server.persistence.FileTxnLog#padFile org.apache.zookeeper.server.persistence.Util#padLogFile java.nio.channels.FileChannel#write(java.nio.ByteBuffer, long)

用了FileChannel的write方法

在commit函數中調用了 log.getChannel().force(false); 即java.nio.channels.FileChannel#force

查閱相關資料如 https://java-nio.avenwu.net/java-nio-filechannel.html 說明了

force方法會把全部未寫磁盤的數據都強制寫入磁盤。
這是由於在操做系統中出於性能考慮回把數據放入緩衝區,因此不能保證數據在調用write寫入文件通道後就及時寫到磁盤上了,除非手動調用force方法。 
force方法須要一個布爾參數,表明是否把meta data也一併強制寫入。

也就是隻有commit方法會進行真正的寫入磁盤,rollLog並無

事務日誌何時會調用truncate 清空部分日誌

集羣版learner向leader同步的時候,leader告訴learner須要回滾同步 調用方Learner#syncWithLeader,後面40節會講

問題

rollLog函數調用flush的做用

上面講了commit和rollLog兩個函數的區別 rollLog調用flush,最後的效果是什麼呢?又沒有寫入磁盤(不然不會再調用commit) 寫入了內存嗎?又沒有調用FileChannel的相關方法。

refer

http://www.cnblogs.com/leesf456/p/6279956.html 如何查看事務日誌 FileTxnLog 什麼是事務性日誌 ZooKeeper運維之數據文件和事務日誌

相關文章
相關標籤/搜索