Zookeeper中寫操做都有維護一個事務日誌,專門用於寫znode操做。只有事務日誌確認過的數據變動纔會在整個集羣生效。node
一、TxnLog數據庫
TxnLog是寫事務日誌接口,主要包含如下接口:緩存
rollLog方法提供日誌滾動功能,若是任隨事務日誌文件無限增加,勢必影響到性能,rollLog方法會從新啓動新的事務日誌文件名,後續事務寫到新的文件中,ZooKeeper中多個事務日誌文件一般會以zxid來區別。app
Truncate方法提供截斷日誌功能,將zxid以後的事務所有刪除。一般當Follower的事務日誌比Leader還多的時候會觸發改方法,保證Follower和Leader的數據庫同步。dom
Commit方法提交文件緩存到磁盤,確保事務真實寫到磁盤而不是僅僅存在於文件內存緩存中。性能
二、實現類FileTxnLogthis
ZooKeeper中實現TxnLog接口的類是FileTxnLog,它的主要功能和方法包括如下這些。日誌
2.1 append方法code
主要代碼:接口
logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); logStream.flush(); currentSize = fos.getChannel().position(); streamsToFlush.add(fos); padFile(fos); Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf);
該方法將事務添加到日誌文件的尾部。
日誌沒滾動前,寫到當前日誌文件;日誌回滾的話寫到新的日誌文件,新日誌文件的名稱是」log.」加當前zxid的值。
2.2 rollLog方法
日誌滾動,關閉舊的日誌文件,啓動新的日誌文件,主要代碼:
if (logStream != null) { this.logStream.flush(); this.logStream = null; }
2.3 getLastLoggedZxid方法
從日誌文件中獲取最近記錄的zxid的值,從lastLoggedZxid就能得到最新的事務日誌文件。多個日誌文件的狀況會遍歷全部文件,選出文件名中zxid最大的那個日誌文件,再從該日誌文件中取到最大的zxid。
主要代碼:
public long getLastLoggedZxid() { File[] files = getLogFiles(logDir.listFiles(), 0); long zxid = maxLog; TxnIterator itr = null; FileTxnLog txn = new FileTxnLog(logDir); itr = txn.read(maxLog); while (true) { if(!itr.next()) break; TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } return zxid; }
2.4 truncate方法
截斷多餘的日誌信息,保證日誌文件是合法有效的。
主要代碼:
public boolean truncate(long zxid) throws IOException { FileTxnIterator itr = null; itr = new FileTxnIterator(this.logDir, zxid); PositionInputStream input = itr.inputStream; long pos = input.getPosition(); RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw"); raf.setLength(pos); raf.close(); return true; }
2.5 commit方法
確認提交日誌文件緩存,主要代碼:
public synchronized void commit() throws IOException { for (FileOutputStream log : streamsToFlush) { log.flush(); if (forceSync) { long startSyncNS = System.nanoTime(); log.getChannel().force(false); } } }