摘要
本節講解html
zk的持久化框架 事務日誌FileTxnLog日誌結構 FileTxnLog源碼 LogFormatter完成事務日誌的反序列化 分析事務日誌demo
持久化整體框架
持久化的類主要在包org.apache.zookeeper.server.persistence下,結構以下圖java
TxnLog,接口類型,讀取事務性日誌的接口。 FileTxnLog,實現TxnLog接口,添加了訪問該事務性日誌的API。 Snapshot,接口類型,持久層快照接口。 FileSnap,實現Snapshot接口,負責存儲、序列化、反序列化、訪問快照。 FileTxnSnapLog,封裝了TxnLog和SnapShot。 Util,工具類,提供持久化所需的API。
兩種日誌
zk主要存放了兩類文件算法
snapshot(內存快照) log(事務日誌,相似MySQL的binlog,將全部與修改數據相關的操做記錄在log中)
關於事務性日誌的定義,能夠參照refer,簡而言之就是 zk事務日誌文件用來記錄事物操做,每個事務操做如添加,刪除節點等等,都會在事務日誌中記錄一條記錄,用來在zookeeper異常狀況下恢復數據數據庫
下面介紹事務日誌apache
事務日誌
正常運行過程當中,針對全部更新操做,在返回客戶端「更新成功」的響應前,ZK會確保已經將本次更新操做的事務日誌寫到磁盤上,只有這樣,整個更新操做纔會生效。session
接口TxnLog
public interface TxnLog { /** * roll the current * log being appended to * @throws IOException */ // 滾動日誌,從當前日誌滾到下一個日誌,不是回滾 void rollLog() throws IOException; /** * Append a request to the transaction log * @param hdr the transaction header * @param r the transaction itself * returns true iff something appended, otw false * @throws IOException */ // 添加一個請求至事務性日誌 boolean append(TxnHeader hdr, Record r) throws IOException; /** * Start reading the transaction logs * from a given zxid * @param zxid * @return returns an iterator to read the * next transaction in the logs. * @throws IOException */ // 讀取事務性日誌 TxnIterator read(long zxid) throws IOException; /** * the last zxid of the logged transactions. * @return the last zxid of the logged transactions. * @throws IOException */ // 事務性操做的最新zxid long getLastLoggedZxid() throws IOException; /** * truncate the log to get in sync with the * leader. * @param zxid the zxid to truncate at. * @throws IOException */ // 清空zxid之後的日誌 boolean truncate(long zxid) throws IOException; /** * the dbid for this transaction log. * @return the dbid for this transaction log. * @throws IOException */ // 獲取數據庫的id long getDbId() throws IOException; /** * commmit the trasaction and make sure * they are persisted * @throws IOException */ // 提交事務並進行確認 void commit() throws IOException; /** * close the transactions logs */ // 關閉事務性日誌 void close() throws IOException; /** * an iterating interface for reading * transaction logs. */ // 讀取事務日誌的迭代器接口 public interface TxnIterator { /** * return the transaction header. * @return return the transaction header. */ // 獲取事務頭部 TxnHeader getHeader(); /** * return the transaction record. * @return return the transaction record. */ // 獲取事務 Record getTxn(); /** * go to the next transaction record. * @throws IOException */ // 下個事務 boolean next() throws IOException; /** * close files and release the * resources * @throws IOException */ // 關閉文件釋放資源 void close() throws IOException; } }
實現類 FileTxnLog
文件結構
/** * The format of a Transactional log is as follows: * <blockquote><pre> * LogFile: * FileHeader TxnList ZeroPad * * FileHeader: { * magic 4bytes (ZKLG) * version 4bytes * dbid 8bytes * } * * TxnList: * Txn || Txn TxnList * * Txn: * checksum Txnlen TxnHeader Record 0x42 * * checksum: 8bytes Adler32 is currently used * calculated across payload -- Txnlen, TxnHeader, Record and 0x42 * * Txnlen: * len 4bytes * * TxnHeader: { * sessionid 8bytes * cxid 4bytes * zxid 8bytes * time 8bytes * type 4bytes * } * * Record: * See Jute definition file for details on the various record types * * ZeroPad: * 0 padded to EOF (filled during preallocation stage) * </pre></blockquote> */
主要接口
append
//添加一條事務性日誌 public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr != null) { // 事務頭部不爲空 if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } if (logStream==null) { //日誌流爲空 if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: log." + Long.toHexString(hdr.getZxid())); } //生成一個新的log文件 logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); //用TXNLOG_MAGIC VERSION dbId來生成文件頭 FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader");//序列化 // Make sure that the magic number is written before padding. logStream.flush(); currentSize = fos.getChannel().position(); streamsToFlush.add(fos); } padFile(fos);//剩餘空間不夠4k時則填充文件64M byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } Checksum crc = makeChecksumAlgorithm();//生成驗證算法 crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC");//將驗證算法的值寫入long Util.writeTxnBytes(oa, buf);//將序列化事務記錄寫入OutputArchive,以0x42('B')結束 return true; } return false; }
)app
getLogFiles
//找出<=snapshot的中最大的zxid的logfile以及後續的logfile public static File[] getLogFiles(File[] logDirList,long snapshotZxid) { List<File> files = Util.sortDataDir(logDirList, "log", true);//按照後綴抽取zxid,按zxid升序排序 long logZxid = 0; // Find the log file that starts before or at the same time as the // zxid of the snapshot for (File f : files) { long fzxid = Util.getZxidFromName(f.getName(), "log"); if (fzxid > snapshotZxid) { continue; } // the files // are sorted with zxid's if (fzxid > logZxid) { logZxid = fzxid; } } List<File> v=new ArrayList<File>(5); for (File f : files) { long fzxid = Util.getZxidFromName(f.getName(), "log"); if (fzxid < logZxid) { continue; } v.add(f); } return v.toArray(new File[0]); }
getLastLoggedZxid
//獲取記錄在log中的最後一個zxid public long getLastLoggedZxid() { File[] files = getLogFiles(logDir.listFiles(), 0); //找到最大的zxid所在的文件 long maxLog=files.length>0? Util.getZxidFromName(files[files.length-1].getName(),"log"):-1; // if a log file is more recent we must scan it to find // the highest zxid long zxid = maxLog; TxnIterator itr = null; try { FileTxnLog txn = new FileTxnLog(logDir); itr = txn.read(maxLog); while (true) { if(!itr.next()) break; TxnHeader hdr = itr.getHeader();//遍歷這個文件,找到最後一條事務日誌記錄 zxid = hdr.getZxid();//取出zxid } } catch (IOException e) { LOG.warn("Unexpected exception", e); } finally { close(itr); } return zxid; }
commit
//提交事務日誌至磁盤 public synchronized void commit() throws IOException { if (logStream != null) { logStream.flush();// 強制刷到磁盤 } for (FileOutputStream log : streamsToFlush) { log.flush();// 強制刷到磁盤 if (forceSync) { long startSyncNS = System.nanoTime(); log.getChannel().force(false); long syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { LOG.warn("fsync-ing the write ahead log in " + Thread.currentThread().getName() + " took " + syncElapsedMS + "ms which will adversely effect operation latency. " + "See the ZooKeeper troubleshooting guide"); } } } while (streamsToFlush.size() > 1) { streamsToFlush.removeFirst().close();// 移除流並關閉 } }
truncate
//清空大於指定zxid的事務日誌 public boolean truncate(long zxid) throws IOException { FileTxnIterator itr = null; try { itr = new FileTxnIterator(this.logDir, zxid);//根據zxid找到迭代器 PositionInputStream input = itr.inputStream; if(input == null) { throw new IOException("No log files found to truncate! This could " + "happen if you still have snapshots from an old setup or " + "log files were deleted accidentally or dataLogDir was changed in zoo.cfg."); } long pos = input.getPosition(); // now, truncate at the current position RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw"); raf.setLength(pos);//把當前log後面的部分(zxid更大的)截斷 raf.close(); while (itr.goToNextLog()) { if (!itr.logFile.delete()) {//把後面的log文件都刪除 LOG.warn("Unable to truncate {}", itr.logFile); } } } finally { close(itr); } return true; }
rollLog
這個必定要看註釋,意思不是回滾日誌,是從當前日誌滾到下一個框架
/** * rollover the current log file to a new one. * @throws IOException */ public synchronized void rollLog() throws IOException { if (logStream != null) { this.logStream.flush(); this.logStream = null; oa = null; } }
事務日誌可視化 LogFormatter
能夠結合org.apache.zookeeper.server.persistence.FileTxnLog#append進行理解 傳入參數爲對應的事務日誌路徑便可運維
public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println("USAGE: LogFormatter log_file"); System.exit(2); } FileInputStream fis = new FileInputStream(args[0]); BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis); FileHeader fhdr = new FileHeader(); fhdr.deserialize(logStream, "fileheader"); //反序列化header完成驗證 if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) { System.err.println("Invalid magic number for " + args[0]); System.exit(2); } System.out.println("ZooKeeper Transactional Log File with dbid " + fhdr.getDbid() + " txnlog format version " + fhdr.getVersion()); int count = 0; while (true) { long crcValue; byte[] bytes; try { crcValue = logStream.readLong("crcvalue");//獲取反序列化的checksum bytes = logStream.readBuffer("txnEntry"); } catch (EOFException e) { System.out.println("EOF reached after " + count + " txns."); return; } if (bytes.length == 0) { // Since we preallocate, we define EOF to be an // empty transaction System.out.println("EOF reached after " + count + " txns."); return; } Checksum crc = new Adler32(); crc.update(bytes, 0, bytes.length); if (crcValue != crc.getValue()) {//比較本身生成的checksum與傳遞過來的checksum throw new IOException("CRC doesn't match " + crcValue + " vs " + crc.getValue()); } TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(bytes, hdr);//反序列化事務 System.out.println(DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.LONG).format(new Date(hdr.getTime())) + " session 0x" + Long.toHexString(hdr.getClientId()) + " cxid 0x" + Long.toHexString(hdr.getCxid()) + " zxid 0x" + Long.toHexString(hdr.getZxid()) + " " + TraceFormatter.op2String(hdr.getType()) + " " + txn); if (logStream.readByte("EOR") != 'B') { LOG.error("Last transaction was partial."); throw new EOFException("Last transaction was partial."); } count++; } }
事務日誌可視化效果
針對http://www.jianshu.com/p/d1f8b9d6ad57貼出的demo 利用LogFormatter進行解析,事先把事務日誌目錄清空 輸出爲dom
ZooKeeper Transactional Log File with dbid 0 txnlog format version 2 17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x0 zxid 0x1 createSession 20000 17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x2 zxid 0x2 create '/test1,#7a6e6f646531,v{s{31,s{'world,'anyone}}},T,1 17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x3 zxid 0x3 create '/test2,#7a6e6f646532,v{s{31,s{'world,'anyone}}},T,2 17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x4 zxid 0x4 create '/test3,#7a6e6f646533,v{s{31,s{'world,'anyone}}},T,3 17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0x9 zxid 0x5 setData '/test2,#7a4e6f64653232,1 17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0xb zxid 0x6 delete '/test2 17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0xc zxid 0x7 delete '/test1 17-5-24 下午04時16分04秒 session 0x15c398687180000 cxid 0x0 zxid 0x8 closeSession null EOF reached after 8 txns.
結合FileTxnLog#append很好理解
吐槽
tag不匹配
序列化時 org.apache.zookeeper.server.persistence.FileTxnLog#append裏面是 oa.writeLong(crc.getValue(), "txnEntryCRC");//將驗證算法的值寫入long 反序列化,解析的時候是 org.apache.zookeeper.server.LogFormatter#main crcValue = logStream.readLong("crcvalue"); 這倆tag都不同,雖然並不影響運行!!!
FileTxnLog#getLogFiles效率低
都已經按zxid升序排序了,一次循環就該搞定了
思考
文件後綴是按照zxid來生成的
logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); 這對於定位文件,zxid都提供了一些便利 好比在getLastLoggedZxid中的調用
rollLog函數的意義
函數沒有參數 必定要注意,是從當前日誌,滾到下一個日誌(好比日誌量太大了之類的場景) 不是回滾日誌裏面的記錄,試想回滾怎麼能不告訴回滾的zxid呢
能夠比較一下,rollLog函數形成logstream爲null,append函數便會生成新的文件logFileWrite,新的流logStream
commit和rollLog兩個函數都調用了flush,區別是什麼
涉及到FileChannel,nio相關,
寫入FileChannel調用鏈以下 org.apache.zookeeper.server.persistence.FileTxnLog#append org.apache.zookeeper.server.persistence.FileTxnLog#padFile org.apache.zookeeper.server.persistence.Util#padLogFile java.nio.channels.FileChannel#write(java.nio.ByteBuffer, long)
用了FileChannel的write方法
在commit函數中調用了 log.getChannel().force(false); 即java.nio.channels.FileChannel#force
查閱相關資料如 https://java-nio.avenwu.net/java-nio-filechannel.html 說明了
force方法會把全部未寫磁盤的數據都強制寫入磁盤。 這是由於在操做系統中出於性能考慮回把數據放入緩衝區,因此不能保證數據在調用write寫入文件通道後就及時寫到磁盤上了,除非手動調用force方法。 force方法須要一個布爾參數,表明是否把meta data也一併強制寫入。
也就是隻有commit方法會進行真正的寫入磁盤,rollLog並無
事務日誌何時會調用truncate 清空部分日誌
集羣版learner向leader同步的時候,leader告訴learner須要回滾同步 調用方Learner#syncWithLeader,後面40節會講
問題
rollLog函數調用flush的做用
上面講了commit和rollLog兩個函數的區別 rollLog調用flush,最後的效果是什麼呢?又沒有寫入磁盤(不然不會再調用commit) 寫入了內存嗎?又沒有調用FileChannel的相關方法。
refer
http://www.cnblogs.com/leesf456/p/6279956.html 如何查看事務日誌 FileTxnLog 什麼是事務性日誌 ZooKeeper運維之數據文件和事務日誌