說明Giraph如何藉助ZooKeeper來實現Master與Workers間的同步(不太肯定)。java
在單機上(機器名:giraphx)啓動了2個workers。app
Giraph聽從單Master多Workers結構,BSPServiceMaster使用MasterThread線程來進行全局的同步。每一個Worker啓動成功後,會向Master彙報自身的健康情況,那麼Master是如何檢測Workers是否都成功啓動了?分佈式
Master在ZooKeeper上創兩個目錄,_workerHealthyDir和 _workerUnhealthyDir,分別用來記錄Healthy Workers和UnHealthy Workers。ide
主要在BspServiceMaster類中的getAllWorkerInfos()方法來完成,其調用關係以下,注意下getAllWorkerInfos()到MasterThread.run()方法調用關係,比較難找。oop
建立的兩個目錄以下:源碼分析
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir
每一個Worker在setup()中,調用registerHealth()方法來註冊自身的狀態。線程
若自身是Healthy的,則在_workerHealthyDir目錄下添加子節點 /wokerInfo.getHostNameId(),不然在workerUnhealthyDir目錄下添加。wokerInfo.getHostNameId()爲:Hostname+「」+TaskId。 Task1和Task2 (Task 0是master) 建立的子節點以下:設計
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1 /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2
Master 在checkWorkers()方法中,在While死循環中(實際有超時限制),經過調用getAllWorkerInfos()方法來獲取_workerHealthyDir目錄下的子節點,而後比較子節點數目是否達到maxWorkers(啓動job時定義的,-w參數)。code
若小於maxWorkers,則繼續調用getAllWorkerInfos()方法進行下一輪檢測;若等於maxWorker,退出While循環,而後返回healthyWorkersInfoList:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)] 。blog
問題:因爲在分佈式環境中,每一個Worker和Maste都是並行運行,彼此不知道對方的運行狀況。上述第3步驟中,若還有子節點尚未建立,就一直在while死循環中調用來檢測getAllWorkerInfos()方法檢測,效率比較低下,固然也比較笨!
Giraph借用ZooKeeper來高效的進行檢測。設計理念以下:
若某個task建立了子節點後,就會觸發Watcher事件。
若子節點數目小於maxWorkers,就調用 workerHealthRegistrationChanged的await()方法釋放當前線程的鎖,陷入等待狀態。不會進行無用的檢測。
說明:workerHealthRegistrationChanged爲PredicateLock類型(implements BspEvent接口),PredicateLock裏面使用可重入鎖 ReentrantLock和Condition進行線程的控制。
當某個task建立了子節點後,觸發Watcher事件。
調用BspService中的public final void Process(WatchedEvent event)事件,該方法根據事件的路徑來激活相應的BspEvent事件。此處對應的是:
實驗運行以下:
s(926)) - process: Got a new event, path = /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process(960)) - process: workerHealthRegistrationChanged (worker health reported - healthy/unhealthy )
這樣就會激活master線程,開始下一輪檢測。
子節點數目等於maxWorkers時,就中止。
總結:每建立一個子節點時,纔會進行一次檢測,效率較高!