Giraph源碼分析(四)—— Master 如何檢查Worker啓動成功

本文的目的

說明Giraph如何藉助ZooKeeper來實現Master與Workers間的同步(不太肯定)。java

環境

在單機上(機器名:giraphx)啓動了2個workers。app

Giraph聽從單Master多Workers結構,BSPServiceMaster使用MasterThread線程來進行全局的同步。每一個Worker啓動成功後,會向Master彙報自身的健康情況,那麼Master是如何檢測Workers是否都成功啓動了?分佈式

Master在ZooKeeper上創兩個目錄,_workerHealthyDir和 _workerUnhealthyDir,分別用來記錄Healthy Workers和UnHealthy Workers。ide

主要在BspServiceMaster類中的getAllWorkerInfos()方法來完成,其調用關係以下,注意下getAllWorkerInfos()到MasterThread.run()方法調用關係,比較難找。oop

Giraph源碼分析(四)—— Master 如何檢查Worker啓動成功

建立的兩個目錄以下:源碼分析

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir

每一個Worker在setup()中,調用registerHealth()方法來註冊自身的狀態。線程

若自身是Healthy的,則在_workerHealthyDir目錄下添加子節點 /wokerInfo.getHostNameId(),不然在workerUnhealthyDir目錄下添加。wokerInfo.getHostNameId()爲:Hostname+「」+TaskId。 Task1和Task2 (Task 0是master) 建立的子節點以下:設計

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2

Master 在checkWorkers()方法中,在While死循環中(實際有超時限制),經過調用getAllWorkerInfos()方法來獲取_workerHealthyDir目錄下的子節點,而後比較子節點數目是否達到maxWorkers(啓動job時定義的,-w參數)。code

若小於maxWorkers,則繼續調用getAllWorkerInfos()方法進行下一輪檢測;若等於maxWorker,退出While循環,而後返回healthyWorkersInfoList:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)] 。blog

問題:因爲在分佈式環境中,每一個Worker和Maste都是並行運行,彼此不知道對方的運行狀況。上述第3步驟中,若還有子節點尚未建立,就一直在while死循環中調用來檢測getAllWorkerInfos()方法檢測,效率比較低下,固然也比較笨!

Giraph借用ZooKeeper來高效的進行檢測。設計理念以下:

  1. master在獲取子節點時,註冊Watcher(爲註冊器,用於觸發相應事件)。

Giraph源碼分析(四)—— Master 如何檢查Worker啓動成功

若某個task建立了子節點後,就會觸發Watcher事件。

若子節點數目小於maxWorkers,就調用 workerHealthRegistrationChanged的await()方法釋放當前線程的鎖,陷入等待狀態。不會進行無用的檢測。

說明:workerHealthRegistrationChanged爲PredicateLock類型(implements BspEvent接口),PredicateLock裏面使用可重入鎖 ReentrantLock和Condition進行線程的控制。

當某個task建立了子節點後,觸發Watcher事件。

調用BspService中的public final void Process(WatchedEvent event)事件,該方法根據事件的路徑來激活相應的BspEvent事件。此處對應的是:

Giraph源碼分析(四)—— Master 如何檢查Worker啓動成功

實驗運行以下:

s(926)) - process: Got a new event, path = /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process(960)) - process: workerHealthRegistrationChanged (worker health reported - healthy/unhealthy )

這樣就會激活master線程,開始下一輪檢測。

子節點數目等於maxWorkers時,就中止。

總結:每建立一個子節點時,纔會進行一次檢測,效率較高!

相關文章
相關標籤/搜索