在前兩篇關於 zookeeper
的分析文章中,咱們熟悉了其選舉投票及集羣間數據同步的過程,本文將針對 zookeeper
啓動前的一個重要處理過程進行分析;也便是數據初始化
。java
經過調用追蹤咱們發現 zookeeper
最終會執行 ZKDatabase
的方法 loadDataBase
來完成數據初始化數據庫
public long loadDataBase() throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
null, null);
r.txn = txn;
r.hdr = hdr;
r.zxid = hdr.getZxid();
// 將事務日誌包裝成 proposal 集羣模式下會同步給follower
addCommittedProposal(r);
}
};
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
initialized = true;
return zxid;
}
複製代碼
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 快照文件加載至內存
snapLog.deserialize(dt, sessions);
// ......
}
複製代碼
下面咱們將針對整個過程進行分析:數組
FileSnap
經過反序列化快照文件,將數據寫入內存數據庫 DataTree
bash
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// 查找有效的快照文件
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
// 省略
}
複製代碼
從 deserialize
實現可知, zookeeper
在處理快照文件時首先須要查找指定個數的有效快照文件; 若不存在快照文件則返回 -1 。session
private List<File> findNValidSnapshots(int n) throws IOException {
// 查找 snapDir 目錄下 snapshot 爲前綴的快照文件;並按照 zxid 大小降序排列
List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
int count = 0;
List<File> list = new ArrayList<File>();
for (File f : files) {
try {
// 判斷當前快照文件是否爲有效文件
if (Util.isValidSnapshot(f)) {
list.add(f);
count++;
// 當有效快照文件個數 == n 時退出循環
if (count == n) {
break;
}
}
} catch (IOException e) {
LOG.info("invalid snapshot " + f, e);
}
}
return list;
}
複製代碼
public static boolean isValidSnapshot(File f) throws IOException {
if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1)
return false;
// Check for a valid snapshot
// 隨機讀文件
RandomAccessFile raf = new RandomAccessFile(f, "r");
try {
// including the header and the last / bytes
// the snapshot should be atleast 10 bytes
// 文件內容長度至少爲 10 字節
if (raf.length() < 10) {
return false;
}
// 指針移動到文件末尾 5 個字節處
raf.seek(raf.length() - 5);
byte bytes[] = new byte[5];
int readlen = 0;
int l;
// 讀最後 5 字節至 bytes 字節數組中
while(readlen < 5 &&
(l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) {
readlen += l;
}
if (readlen != bytes.length) {
LOG.info("Invalid snapshot " + f
+ " too short, len = " + readlen);
return false;
}
ByteBuffer bb = ByteBuffer.wrap(bytes);
int len = bb.getInt();
byte b = bb.get();
// 前 4 字節爲整數 1,最後一個字節爲 / 則爲有效快照文件
if (len != 1 || b != '/') {
LOG.info("Invalid snapshot " + f + " len = " + len
+ " byte = " + (b & 0xff));
return false;
}
} finally {
raf.close();
}
return true;
}
複製代碼
從上述代碼實現可知,
zookeeper
的snap file
具備如下特性:less
1
+ 字符 /
能夠在後續生成快照文件時進行驗證
複製代碼
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// 查找有效的快照文件
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
// 反序列化文件
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
// 將文件內容寫入 dataTree
deserialize(dt,sessions, ia);
// 文件校驗和判斷
// 若 checkSum 不一致則繼續解析下一個快照文件
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
// 實際上當最近的一個快照文件爲有效文件的時候就會退出循環
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
// 沒有一個有效的快照文件,則啓動失敗
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
// 記錄最近的 zxid
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
return dt.lastProcessedZxid;
}
複製代碼
zookeeper
經過遍歷snap file
並反序列化解析snap file
將其寫入內存數據庫dataTree
, 在該過程當中會對check sum
進行檢查以肯定snap file
的正確性(通常來說當snap file
正確性經過後,只會解析最新的snap file
);若沒有一個正確的snap file
則拋出異常說明啓動失敗,反之dataTree
將會記錄lastProcessedZxid
。dom
下面咱們繼續對 FileTxSnapLog
的 restore
方法進行分析,zookeeper
在完成快照文件的處理以後,會加載事務日誌文件並處理this
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 快照文件加載至內存
snapLog.deserialize(dt, sessions);
// 構造 FileTxnLog 實例
FileTxnLog txnLog = new FileTxnLog(dataDir);
// 經過logDir zxid 構造 FileTxnIterator 實例並執行初始化動做
// 加載比快照數據新的日誌文件並查找最接近快照的事務日誌記錄
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
// 省略
}
複製代碼
在執行spa
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
複製代碼
時會構建 FileTxnIterator
實例並執行 init
動做,代碼以下:指針
public FileTxnIterator(File logDir, long zxid) throws IOException {
this.logDir = logDir;
this.zxid = zxid;
init();
}
複製代碼
接下來咱們看下其 init
中如何處理事務日誌文件
void init() throws IOException {
storedFiles = new ArrayList<File>();
// 查找 logDir 目錄下的全部事務日誌記錄文件,並按 zxid 降序排列
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
// storedFiles 用來記錄 zxid 大於快照文件 zxid 的事務日誌文件
for (File f: files) {
if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
storedFiles.add(f);
}
// add the last logfile that is less than the zxid
else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
storedFiles.add(f);
break;
}
}
// 由於一個事務日誌文件中記錄了多條事務日誌,而事務日誌文件名的後綴是當前文件的第一條事務記錄的zxid
// 經過判斷 hdr.getZxid < zxid 查找最接近快照的事務記錄
goToNextLog();
if (!next())
return;
while (hdr.getZxid() < zxid) {
if (!next())
return;
}
}
複製代碼
從 init
的實現能夠看出 zookeeper
對事物日誌文件的處理流程以下:
hdr.getZxid() < zxid
查找最接近快照的事務日誌記錄上文中經過加載事務日誌文件查找到快照以後所提交的事務記錄,下面就看下如何處理這些事務記錄的
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
// 快照文件加載至內存
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
// 加載比快照數據新的日誌文件
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(higestZxid) > {}(next log) for type {}",
new Object[] { highestZxid, hdr.getZxid(),
hdr.getType() });
} else {
highestZxid = hdr.getZxid();
}
try {
// 處理事務日誌 寫入內存數據庫
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
// listener 回調將事務包裝爲 proposal 集羣模式下同步至 follower
listener.onTxnLoaded(hdr, itr.getTxn());
// 繼續下一條事務日誌
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
複製代碼
從 restore
的後續實現咱們能夠看出, zookeeper
在完成事務日誌文件加載以後,會依次處理日誌文件中每條事務記錄:
listener
回調事務加載事件;將事務包裝爲 proposal
存儲到 committedLog
列表中,並分別記錄 maxCommittedLog
,minCommittedLog
(參見上文中的PlayBackListener
)經過上文的分析,能夠歸納下 zookeeper
數據初始化的流程以下: