Spark源碼之部署模式

Spark 多種部署模式,如Yarn,Standalone,Local等等。主節點啓動deploy.master,從節點啓動deploy.worker。數組

    1. Worker的主要流程
      1. 啓動時

發送RegisterWorker消息給Master。若是master回覆註冊成功,則設置master,並啓動心跳。最後將executors的狀態報告給master。若是註冊失敗,則退出。app

      1. Worker處理消息

master給worker傳遞的消息:函數

消息名稱spa

說明對象

RegisterWorkerResponse事件

回覆給worker的註冊結果消息,上面已說明內存

SendHeartbeatci

心跳,傳遞worker的狀態資源

WorkDirCleanuprem

清除worker的工做目錄

MasterChanged

更改master,worker從新設置,並將executors的狀態從新通知給新的master

ReconnectWorker

從新註冊worker

LaunchExecutor

啓動新的executor,建立ExecutorRunner對象,並通知master

KillExecutor

殺死Executor

LaunchDriver

建立DriverRunner,啓動

 

 

 

      1. Master處理消息

master消息主要是Executor、Worker、Application和Driver相關的消息:

消息名稱

說明

ElectedLeader

選舉leader

RegisterWorker

新的worker註冊進來。註冊成功則發送RegisteredWorker消息給worker

RegisterApplication

註冊app,包含driver和描述信息等。從driver啓動spark task。

只有StandaloneAppClient會發送RegisterApplication消息。

schedule

調度主方法

主要目的判斷是否有worker知足啓動executor的條件,若是知足,則發送LaunchExecutor消息給worker,同時發送ExecutorAdded消息給Driver。

ExecutorStateChanged

executor狀態更新,有多是executor中止,須要維護狀態,並判斷application狀態是否要更新,通知Driver該事件。

DriverStateChanged

判斷是否調用RemoveDriver方法

Heartbeat

判斷是否從新註冊worker

RequestSubmitDriver

建立Driver,調用schedule開始調度。由deploy的submit過程提交該消息。

RequestExecutors

請求Executor,要通知指定的worker建立Executor?

KillExecutors

刪除Executor

 

KillExecutor

刪除Executor,從指定的worker刪除executor,同時通知消息給對應worker。

private def killExecutor(exec: ExecutorDesc): Unit = {

    exec.worker.removeExecutor(exec)

    exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))

    exec.state = ExecutorState.KILLED

  }

 

registerWorker

註冊Worker,建立一個新的Worker對象,設置該Worker的cpus和內存,保存到master的數組中。

 

schedule

重要的調度函數。每次資源信息或者Executor狀態等更新時觸發該方法執行新的調度。

調度是指尋找等待的driver,爲每一個等待的driver尋找空閒cpu和內存數量都知足該driver需求的worker,而後啓動driver,向worker發送啓動driver消息在該worker上啓動driver,啓動worker上的executor。

爲每一個app,在可用的worker上分配executor數組。

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {

    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

    worker.addExecutor(exec)

    worker.endpoint.send(LaunchExecutor(masterUrl,

      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))

    exec.application.driver.send(

      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))

  }

worker發送LaunchExecutor消息,同時向appdriver發送executorAdded消息。

注意有兩個分配:一是driver的啓動分配,須要判斷workercpu和內存是否知足條件;二是executor的分配,也要判斷worker的剩餘cpu和內存是否知足。

相關文章
相關標籤/搜索