Giraph 源碼分析(五)—— 加載數據+同步總結

做者|白松node

關於Giraph 共有九個章節,本文第五個章節。app

環境:在單機上(機器名:giraphx)啓動了2個workers。ide

輸入:SSSP文件夾,裏面有1.txt和2.txt兩個文件。工具

一、在Worker向Master彙報健康情況後,就開始等待Master建立InputSplit。oop

方法:每一個Worker經過檢某個Znode節點是否存在,同時在此Znode上設置Watcher。若不存在,就經過BSPEvent的waitForever()方法釋放當前線程的鎖,陷入等待狀態。一直等到master建立該znode。此步驟位於BSPServiceWorker類中的startSuperStep方法中,等待代碼以下:源碼分析

Giraph 源碼分析(五)—— 加載數據+同步總結
二、Master調用createInputSplits()方法建立InputSplit。線程

Giraph 源碼分析(五)—— 加載數據+同步總結

在generateInputSplits()方法中,根據用戶設定的VertexInputFormat得到InputSplits。代碼以下:3d

Giraph 源碼分析(五)—— 加載數據+同步總結

其中minSplitCountHint爲建立split的最小數目,其值以下:orm

minSplitCountHint = Workers數目 * NUM_INPUT_THREADS對象

NUM_INPUT_THREADS表示 每一個Input split loading的線程數目,默認值爲1 。 經查證,在TextVertexValueInputFormat抽象類中的getSplits()方法中的minSplitCountHint參數被忽略。用戶輸入的VertexInputFormat繼承TextVertexValueInputFormat抽象類。

若是獲得的splits.size小於minSplitCountHint,那麼有些worker就沒被用上。

獲得split信息後,要把這些信息寫到Zookeeper上,以便其餘workers訪問。上面獲得的split信息以下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]

遍歷splits List,爲每一個split建立一個Znode,值爲split的信息。如爲split-0建立Znode,值爲:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

爲split-1建立znode(以下),值爲:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1

最後建立znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示全部splits都建立好了。

三、Master根據splits建立Partitions。首先肯定partition的數目。

Giraph 源碼分析(五)—— 加載數據+同步總結

BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>對象默認爲HashMasterPartitioner。它的createInitialPartitionOwners()方法以下:

Giraph 源碼分析(五)—— 加載數據+同步總結

上面代碼中是在工具類PartitionUtils計算Partition的數目,計算公式以下:

partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默認值爲1 。

可見,partitionCount值爲4(122)。建立的partitionOwnerList信息以下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

四、Master建立Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用於後面的exchange partition。

五、Master最後在assignPartitionOwners()方法中

把masterinfo,chosenWorkerInfoList,partitionOwners等信息寫入Znode中(做爲Znode的data),該Znode的路徑爲: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。

Master調用barrierOnWorkerList()方法開始等待各個Worker完成數據加載。調用關係以下:

Giraph 源碼分析(五)—— 加載數據+同步總結

barrierOnWorkerList中建立znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。而後檢查該znode的子節點數目是否等於workers的數目,若不等於,則線程陷入等待狀態。後面某個worker完成數據加載後,會建立子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)來激活該線程繼續判斷。

六、當Master建立第5步的znode後,會激活worker。

每一個worker從znode上讀出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,而後各個worker開始加載數據。

把partitionOwnerList複製給BSPServiceWorker類中的workerGraphPartitioner(默認爲HashWorkerPartitioner類型)對象的partitionOwnerList變量,後續每一個頂點把根據vertexID經過workerGraphPartitioner對象獲取其對應的partitionOwner。

Giraph 源碼分析(五)—— 加載數據+同步總結

每一個Worker從znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir獲取子節點,獲得inputSplitPathList,內容以下:

[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]

而後每一個Worker建立N個InputsCallable線程讀取數據。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默認值爲1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1

那麼,默認每一個worker就是建立一個線程來加載數據。

在InputSplitsHandler類中的reserveInputSplit()方法中,每一個worker都是遍歷inputSplitPathList,經過建立znode來保留(標識要處理)的split。代碼及註釋以下:

Giraph 源碼分析(五)—— 加載數據+同步總結

當用reserveInputSplit()方法獲取某個znode後,loadSplitsCallable類的loadInputSplit方法就開始經過該znode獲取其HDFS的路徑信息,而後讀入數據、重分佈數據。

Giraph 源碼分析(五)—— 加載數據+同步總結

Giraph 源碼分析(五)—— 加載數據+同步總結

VertexInputSplitsCallable類的readInputSplit()方法以下:

Giraph 源碼分析(五)—— 加載數據+同步總結

七、每一個worker加載完數據後,調用waitForOtherWorkers()方法等待其餘workers都處理完split。

Giraph 源碼分析(五)—— 加載數據+同步總結

策略以下,每一個worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目錄下建立子節點,後面追加本身的worker信息,如worker一、worker2建立的子節點分別以下:

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2

建立完後,而後等待master建立/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。

八、從第5步驟可知,若master發現/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子節點數目等於workers的總數目,就會在coordinateInputSplits()方法中建立

_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告訴每一個worker,全部的worker都處理完了split。

九、最後就是就行全局同步。

master建立znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,而後再調用barrierOnWorkerList方法檢查該znode的子節點數目是否等於workers的數目,若不等於,則線程陷入等待狀態。等待worker建立子節點來激活該線程繼續判斷。

每一個worker獲取自身的Partition Stats,進入finishSuperStep方法中,等待全部的Request都被處理完;把自身的Aggregator信息發送給master;建立子節點,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data爲該worker的partitionStatsList和workerSentMessages統計量;

最後調用waitForOtherWorkers()方法等待master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點。

master發現/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子節點數目等於workers數目後,根據/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子節點上的data收集每一個worker發送的aggregator信息,彙總爲globalStats。

Master若發現全局信息中(1)全部頂點都voteHalt且沒有消息傳遞,或(2)達到最大迭代次數 時,設置 globalStats.setHaltComputation(true)。告訴works結束迭代。

master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點,data爲globalStats。告訴全部workers當前超級步結束。

每一個Worker檢測到master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點後,讀出該znode的數據,即全局的統計信息。而後決定是否繼續下一次迭代。

十、同步以後開始下一個超級步。

十一、master和workers同步過程總結。

(1)master建立znode A,而後檢測A的子節點數目是否等於workers數目,不等於就陷入等待。某個worker建立一個子節點後,就會喚醒master進行檢測一次。

(2)每一個worker進行本身的工做,完成後,建立A的子節點A1。而後等待master建立znode B。

(3)若master檢測到A的子節點數目等於workers的數目時,建立Znode B

(4)master建立B 節點後,會激活各個worker。同步結束,各個worker就能夠開始下一個超步。

本質是經過znode B來進行全局同步的。

相關文章
相關標籤/搜索