ZooKeeper系列之(十五):寫日誌

Zookeeper中寫操做都有維護一個事務日誌,專門用於寫znode操做。只有事務日誌確認過的數據變動纔會在整個集羣生效。node

一、TxnLog數據庫

TxnLog是寫事務日誌接口,主要包含如下接口:緩存

  1. rollLog: 日誌滾動,啓動新日誌
  2. append:添加寫事務到日誌尾部
  3. getLastLoggedZxid:讀取最後記錄的zxid
  4. truncate:截斷寫事務,用在Learner事務比Leader事務多的場景
  5. commit:提交事務,確認事務已持久化

rollLog方法提供日誌滾動功能,若是任隨事務日誌文件無限增加,勢必影響到性能,rollLog方法會從新啓動新的事務日誌文件名,後續事務寫到新的文件中,ZooKeeper中多個事務日誌文件一般會以zxid來區別。app

Truncate方法提供截斷日誌功能,將zxid以後的事務所有刪除。一般當Follower的事務日誌比Leader還多的時候會觸發改方法,保證Follower和Leader的數據庫同步。dom

Commit方法提交文件緩存到磁盤,確保事務真實寫到磁盤而不是僅僅存在於文件內存緩存中。性能

二、實現類FileTxnLogthis

ZooKeeper中實現TxnLog接口的類是FileTxnLog,它的主要功能和方法包括如下這些。日誌

2.1 append方法code

主要代碼:接口

logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
padFile(fos);
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);

該方法將事務添加到日誌文件的尾部。

日誌沒滾動前,寫到當前日誌文件;日誌回滾的話寫到新的日誌文件,新日誌文件的名稱是」log.」加當前zxid的值。

2.2 rollLog方法

日誌滾動,關閉舊的日誌文件,啓動新的日誌文件,主要代碼:

if (logStream != null) {
    this.logStream.flush();
    this.logStream = null;
}

2.3 getLastLoggedZxid方法

從日誌文件中獲取最近記錄的zxid的值,從lastLoggedZxid就能得到最新的事務日誌文件。多個日誌文件的狀況會遍歷全部文件,選出文件名中zxid最大的那個日誌文件,再從該日誌文件中取到最大的zxid。

主要代碼:

public long getLastLoggedZxid() {
    File[] files = getLogFiles(logDir.listFiles(), 0);
    long zxid = maxLog;
    TxnIterator itr = null;
    FileTxnLog txn = new FileTxnLog(logDir);
    itr = txn.read(maxLog);
     while (true) {
        if(!itr.next())
             break;
        TxnHeader hdr = itr.getHeader();
        zxid = hdr.getZxid();
     }     
     return zxid;
 }

 

2.4 truncate方法

截斷多餘的日誌信息,保證日誌文件是合法有效的。

主要代碼:

public boolean truncate(long zxid) throws IOException {
    FileTxnIterator itr = null;
    itr = new FileTxnIterator(this.logDir, zxid);
    PositionInputStream input = itr.inputStream;
    long pos = input.getPosition();
    RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
    raf.setLength(pos);
    raf.close();
    return true;
 }

 

2.5 commit方法

確認提交日誌文件緩存,主要代碼:

public synchronized void commit() throws IOException {
   for (FileOutputStream log : streamsToFlush) {
       log.flush();
       if (forceSync) {
          long startSyncNS = System.nanoTime();
          log.getChannel().force(false);          
      }
  }
}
相關文章
相關標籤/搜索