1、前言算法
前一篇已經分析了序列化,這篇接着分析Zookeeper的持久化過程源碼,持久化對於數據的存儲相當重要,下面進行詳細分析。數據庫
2、持久化整體框架apache
持久化的類主要在包org.apache.zookeeper.server.persistence下,這次也主要是對其下的類進行分析,其包下整體的類結構以下圖所示。數組
· TxnLog,接口類型,讀取事務性日誌的接口。session
· FileTxnLog,實現TxnLog接口,添加了訪問該事務性日誌的API。app
· Snapshot,接口類型,持久層快照接口。框架
· FileSnap,實現Snapshot接口,負責存儲、序列化、反序列化、訪問快照。less
· FileTxnSnapLog,封裝了TxnLog和SnapShot。dom
· Util,工具類,提供持久化所需的API。ide
下面先來分析TxnLog和FileTxnLog的源碼。
3、TxnLog源碼分析
TxnLog是接口,規定了對日誌的響應操做。
public interface TxnLog { /** * roll the current * log being appended to * @throws IOException */ // 回滾日誌 void rollLog() throws IOException; /** * Append a request to the transaction log * @param hdr the transaction header * @param r the transaction itself * returns true iff something appended, otw false * @throws IOException */ // 添加一個請求至事務性日誌 boolean append(TxnHeader hdr, Record r) throws IOException; /** * Start reading the transaction logs * from a given zxid * @param zxid * @return returns an iterator to read the * next transaction in the logs. * @throws IOException */ // 讀取事務性日誌 TxnIterator read(long zxid) throws IOException; /** * the last zxid of the logged transactions. * @return the last zxid of the logged transactions. * @throws IOException */ // 事務性操做的最新zxid long getLastLoggedZxid() throws IOException; /** * truncate the log to get in sync with the * leader. * @param zxid the zxid to truncate at. * @throws IOException */ // 清空日誌,與Leader保持同步 boolean truncate(long zxid) throws IOException; /** * the dbid for this transaction log. * @return the dbid for this transaction log. * @throws IOException */ // 獲取數據庫的id long getDbId() throws IOException; /** * commmit the trasaction and make sure * they are persisted * @throws IOException */ // 提交事務並進行確認 void commit() throws IOException; /** * close the transactions logs */ // 關閉事務性日誌 void close() throws IOException; /** * an iterating interface for reading * transaction logs. */ // 讀取事務日誌的迭代器接口 public interface TxnIterator { /** * return the transaction header. * @return return the transaction header. */ // 獲取事務頭部 TxnHeader getHeader(); /** * return the transaction record. * @return return the transaction record. */ // 獲取事務 Record getTxn(); /** * go to the next transaction record. * @throws IOException */ // 下個事務 boolean next() throws IOException; /** * close files and release the * resources * @throws IOException */ // 關閉文件釋放資源 void close() throws IOException; } }
其中,TxnLog除了提供讀寫事務日誌的API外,還提供了一個用於讀取日誌的迭代器接口TxnIterator。
4、FileTxnLog源碼分析
對於LogFile而言,其格式可分爲以下三部分
LogFile:
FileHeader TxnList ZeroPad
FileHeader格式以下
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}
TxnList格式以下
TxnList:
Txn || Txn TxnList
Txn格式以下
Txn:
checksum Txnlen TxnHeader Record 0x42
Txnlen格式以下
Txnlen:
len 4bytes
TxnHeader格式以下
TxnHeader: {
sessionid 8bytes
cxid 4bytes
zxid 8bytes
time 8bytes
type 4bytes
}
ZeroPad格式以下
ZeroPad:
0 padded to EOF (filled during preallocation stage)
瞭解LogFile的格式對於理解源碼會有很大的幫助。
4.1 屬性
public class FileTxnLog implements TxnLog { private static final Logger LOG; // 預分配大小 64M static long preAllocSize = 65536 * 1024; // 魔術數字,默認爲1514884167 public final static int TXNLOG_MAGIC = ByteBuffer.wrap("ZKLG".getBytes()).getInt(); // 版本號 public final static int VERSION = 2; /** Maximum time we allow for elapsed fsync before WARNing */ // 進行同步時,發出warn以前所能等待的最長時間 private final static long fsyncWarningThresholdMS; // 靜態屬性,肯定Logger、預分配空間大小和最長時間 static { LOG = LoggerFactory.getLogger(FileTxnLog.class); String size = System.getProperty("zookeeper.preAllocSize"); if (size != null) { try { preAllocSize = Long.parseLong(size) * 1024; } catch (NumberFormatException e) { LOG.warn(size + " is not a valid value for preAllocSize"); } } fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000); } // 最大(新)的zxid long lastZxidSeen; // 存儲數據相關的流 volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; volatile FileOutputStream fos = null; // log目錄文件 File logDir; // 是否強制同步 private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");; // 數據庫id long dbId; // 流列表 private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>(); // 當前大小 long currentSize; // 寫日誌文件 File logFileWrite = null; }
4.2. 核心函數
1. append函數
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr != null) { // 事務頭部不爲空 if (hdr.getZxid() <= lastZxidSeen) { // 事務的zxid小於等於最後的zxid LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } if (logStream==null) { // 日誌流爲空 if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: log." + Long.toHexString(hdr.getZxid())); } // logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); // FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); // 序列化 fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. // 刷新到磁盤 logStream.flush(); // 當前通道的大小 currentSize = fos.getChannel().position(); // 添加fos streamsToFlush.add(fos); } // 填充文件 padFile(fos); // Serializes transaction header and transaction data into a byte buffer. // 將事務頭和事務數據序列化成Byte Buffer byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { // 爲空,拋出異常 throw new IOException("Faulty serialization for header " + "and txn"); } // 生成一個驗證算法 Checksum crc = makeChecksumAlgorithm(); // Updates the current checksum with the specified array of bytes // 使用Byte數組來更新當前的Checksum crc.update(buf, 0, buf.length); // 寫long類型數據 oa.writeLong(crc.getValue(), "txnEntryCRC"); // Write the serialized transaction record to the output archive. // 將序列化的事務記錄寫入OutputArchive Util.writeTxnBytes(oa, buf); return true; } return false; }
說明:append函數主要用作向事務日誌中添加一個條目,其大致步驟以下
① 檢查TxnHeader是否爲空,若不爲空,則進入②,不然,直接返回false
② 檢查logStream是否爲空(初始化爲空),若不爲空,則進入③,不然,進入⑤
③ 初始化寫數據相關的流和FileHeader,並序列化FileHeader至指定文件,進入④
④ 強制刷新(保證數據存到磁盤),並獲取當前寫入數據的大小。進入⑤
⑤ 填充數據,填充0,進入⑥
⑥ 將事務頭和事務序列化成ByteBuffer(使用Util.marshallTxnEntry函數),進入⑦
⑦ 使用Checksum算法更新步驟⑥的ByteBuffer。進入⑧
⑧ 將更新的ByteBuffer寫入磁盤文件,返回true
append間接調用了padLog函數,其源碼以下
public static long padLogFile(FileOutputStream f,long currentSize, long preAllocSize) throws IOException{ // 獲取位置 long position = f.getChannel().position(); if (position + 4096 >= currentSize) { // 計算後是否大於當前大小 // 從新設置當前大小,剩餘部分填充0 currentSize = currentSize + preAllocSize; fill.position(0); f.getChannel().write(fill, currentSize-fill.remaining()); } return currentSize; }
說明:其主要做用是當文件大小不滿64MB時,向文件填充0以達到64MB大小。
2. getLogFiles函數
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) { // 按照zxid對文件進行排序 List<File> files = Util.sortDataDir(logDirList, "log", true); long logZxid = 0; // Find the log file that starts before or at the same time as the // zxid of the snapshot for (File f : files) { // 遍歷文件 // 從文件中獲取zxid long fzxid = Util.getZxidFromName(f.getName(), "log"); if (fzxid > snapshotZxid) { // 跳過大於snapshotZxid的文件 continue; } // the files // are sorted with zxid's if (fzxid > logZxid) { // 找出文件中最大的zxid(同時還須要小於等於snapshotZxid) logZxid = fzxid; } } // 文件列表 List<File> v=new ArrayList<File>(5); for (File f : files) { // 再次遍歷文件 // 從文件中獲取zxid long fzxid = Util.getZxidFromName(f.getName(), "log"); if (fzxid < logZxid) { // 跳太小於logZxid的文件 continue; } // 添加 v.add(f); } // 轉化成File[] 類型後返回 return v.toArray(new File[0]); }
說明:該函數的做用是找出剛剛小於或者等於snapshot的全部log文件。其步驟大體以下。
① 對全部log文件按照zxid進行升序排序,進入②
② 遍歷全部log文件並記錄剛剛小於或等於給定snapshotZxid的log文件的logZxid,進入③
③ 再次遍歷log文件,添加zxid大於等於步驟②中的logZxid的全部log文件,進入④
④ 轉化後返回
getLogFiles函數調用了sortDataDir,其源碼以下:
public static List<File> sortDataDir(File[] files, String prefix, boolean ascending) { if(files==null) return new ArrayList<File>(0); // 轉化爲列表 List<File> filelist = Arrays.asList(files); // 進行排序,Comparator是關鍵,根據zxid進行排序 Collections.sort(filelist, new DataDirFileComparator(prefix, ascending)); return filelist; }
說明:其用於排序log文件,能夠選擇根據zxid進行升序或降序。
getLogFiles函數間接調用了getZxidFromName,其源碼以下:
// 從文件名中解析出zxid public static long getZxidFromName(String name, String prefix) { long zxid = -1; // 對文件名進行分割 String nameParts[] = name.split("\\."); if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前綴相同 try { // 轉化成長整形 zxid = Long.parseLong(nameParts[1], 16); } catch (NumberFormatException e) { } } return zxid; }
說明:getZxidFromName主要用做從文件名中解析zxid,而且須要從指定的前綴開始。
3. getLastLoggedZxid函數
public long getLastLoggedZxid() { // 獲取已排好序的全部的log文件 File[] files = getLogFiles(logDir.listFiles(), 0); // 獲取最大的zxid(最後一個log文件對應的zxid) long maxLog=files.length>0? Util.getZxidFromName(files[files.length-1].getName(),"log"):-1; // if a log file is more recent we must scan it to find // the highest zxid // long zxid = maxLog; // 迭代器 TxnIterator itr = null; try { // 新生FileTxnLog FileTxnLog txn = new FileTxnLog(logDir); // 開始讀取從給定zxid以後的全部事務 itr = txn.read(maxLog); while (true) { // 遍歷 if(!itr.next()) // 是否存在下一項 break; // 獲取事務頭 TxnHeader hdr = itr.getHeader(); // 獲取zxid zxid = hdr.getZxid(); } } catch (IOException e) { LOG.warn("Unexpected exception", e); } finally { // 關閉迭代器 close(itr); } return zxid; }
說明:該函數主要用於獲取記錄在log中的最後一個zxid。其步驟大體以下
① 獲取已排好序的全部log文件,並從最後一個文件中取出zxid做爲候選的最大zxid,進入②
② 新生成FileTxnLog並讀取步驟①中zxid以後的全部事務,進入③
③ 遍歷全部事務並提取出相應的zxid,最後返回。
其中getLastLoggedZxid調用了read函數,其源碼以下
public TxnIterator read(long zxid) throws IOException { // 返回事務文件訪問迭代器 return new FileTxnIterator(logDir, zxid); }
說明:read函數會生成一個FileTxnIterator,其是TxnLog.TxnIterator的子類,以後在FileTxnIterator構造函數中會調用init函數,其源碼以下
void init() throws IOException { // 新生成文件列表 storedFiles = new ArrayList<File>(); // 進行排序 List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false); for (File f: files) { // 遍歷文件 if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大於等於指定zxid的文件 storedFiles.add(f); } // add the last logfile that is less than the zxid else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一個zxid小於指定zxid的文件,而後退出 storedFiles.add(f); break; } } // go to the next logfile // 進入下一個log文件 goToNextLog(); if (!next()) // 不存在下一項,返回 return; while (hdr.getZxid() < zxid) { // 從事務頭中獲取zxid小於給定zxid,直到不存在下一項或者大於給定zxid時退出 if (!next()) return; } }
說明:init函數用於進行初始化操做,會根據zxid的不一樣進行不一樣的初始化操做,在init函數中會調用goToNextLog函數,其源碼以下
private boolean goToNextLog() throws IOException { if (storedFiles.size() > 0) { // 存儲的文件列表大於0 // 取最後一個log文件 this.logFile = storedFiles.remove(storedFiles.size()-1); // 針對該文件,建立InputArchive ia = createInputArchive(this.logFile); // 返回true return true; } return false; }
說明:goToNextLog表示選取下一個log文件,在init函數中還調用了next函數,其源碼以下
public boolean next() throws IOException { if (ia == null) { // 爲空,返回false return false; } try { // 讀取長整形crcValue long crcValue = ia.readLong("crcvalue"); // 經過input archive讀取一個事務條目 byte[] bytes = Util.readTxnBytes(ia); // Since we preallocate, we define EOF to be an if (bytes == null || bytes.length==0) { // 對bytes進行判斷 throw new EOFException("Failed to read " + logFile); } // EOF or corrupted record // validate CRC // 驗證CRC Checksum crc = makeChecksumAlgorithm(); // 更新 crc.update(bytes, 0, bytes.length); if (crcValue != crc.getValue()) // 驗證不相等,拋出異常 throw new IOException(CRC_ERROR); if (bytes == null || bytes.length == 0) // bytes爲空,返回false return false; // 新生成TxnHeader hdr = new TxnHeader(); // 將Txn反序列化,而且將對應的TxnHeader反序列化至hdr,整個Record反序列化至record record = SerializeUtils.deserializeTxn(bytes, hdr); } catch (EOFException e) { // 拋出異常 LOG.debug("EOF excepton " + e); // 關閉輸入流 inputStream.close(); // 賦值爲null inputStream = null; ia = null; hdr = null; // this means that the file has ended // we should go to the next file if (!goToNextLog()) { // 沒有log文件,則返回false return false; } // if we went to the next log file, we should call next() again // 繼續調用next return next(); } catch (IOException e) { inputStream.close(); throw e; } // 返回true return true; }
說明:next表示將迭代器移動至下一個事務,方便讀取,next函數的步驟以下。
① 讀取事務的crcValue值,用於後續的驗證,進入②
② 讀取事務,使用CRC32進行更新並與①中的結果進行比對,若不相同,則拋出異常,不然,進入③
③ 將事務進行反序列化並保存至相應的屬性中(如事務頭和事務體),會肯定具體的事務操做類型。
④ 在讀取過程拋出異常時,會首先關閉流,而後再嘗試調用next函數(即進入下一個事務進行讀取)。
4. commit函數
public synchronized void commit() throws IOException { if (logStream != null) { // 強制刷到磁盤 logStream.flush(); } for (FileOutputStream log : streamsToFlush) { // 遍歷流 // 強制刷到磁盤 log.flush(); if (forceSync) { // 是否強制同步 long startSyncNS = System.nanoTime(); log.getChannel().force(false); // 計算流式的時間 long syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS); if (syncElapsedMS > fsyncWarningThresholdMS) { // 大於閾值時則會警告 LOG.warn("fsync-ing the write ahead log in " + Thread.currentThread().getName() + " took " + syncElapsedMS + "ms which will adversely effect operation latency. " + "See the ZooKeeper troubleshooting guide"); } } } while (streamsToFlush.size() > 1) { // 移除流並關閉 streamsToFlush.removeFirst().close(); } }
說明:該函數主要用於提交事務日誌至磁盤,其大體步驟以下
① 若日誌流logStream不爲空,則強制刷新至磁盤,進入②
② 遍歷須要刷新至磁盤的全部流streamsToFlush並進行刷新,進入③
③ 判斷是否須要強制性同步,如是,則計算每一個流的流式時間並在控制檯給出警告,進入④
④ 移除全部流並關閉。
5. truncate函數
public boolean truncate(long zxid) throws IOException { FileTxnIterator itr = null; try { // 獲取迭代器 itr = new FileTxnIterator(this.logDir, zxid); PositionInputStream input = itr.inputStream; long pos = input.getPosition(); // now, truncate at the current position // 從當前位置開始清空 RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw"); raf.setLength(pos); raf.close(); while (itr.goToNextLog()) { // 存在下一個log文件 if (!itr.logFile.delete()) { // 刪除 LOG.warn("Unable to truncate {}", itr.logFile); } } } finally { // 關閉迭代器 close(itr); } return true; }
說明:該函數用於清空大於給定zxid的全部事務日誌。
5、總結
對於持久化中的TxnLog和FileTxnLog的源碼分析就已經完成了,其源碼仍是相對簡單,也謝謝各位園友的觀看~