Spark 多種部署模式,如Yarn,Standalone,Local等等。主節點啓動deploy.master,從節點啓動deploy.worker。數組
發送RegisterWorker消息給Master。若是master回覆註冊成功,則設置master,並啓動心跳。最後將executors的狀態報告給master。若是註冊失敗,則退出。app
master給worker傳遞的消息:函數
消息名稱spa |
說明對象 |
RegisterWorkerResponse事件 |
回覆給worker的註冊結果消息,上面已說明內存 |
SendHeartbeatci |
心跳,傳遞worker的狀態資源 |
WorkDirCleanuprem |
清除worker的工做目錄 |
MasterChanged |
更改master,worker從新設置,並將executors的狀態從新通知給新的master |
ReconnectWorker |
從新註冊worker |
LaunchExecutor |
啓動新的executor,建立ExecutorRunner對象,並通知master |
KillExecutor |
殺死Executor |
LaunchDriver |
建立DriverRunner,啓動 |
|
|
master消息主要是Executor、Worker、Application和Driver相關的消息:
消息名稱 |
說明 |
ElectedLeader |
選舉leader |
RegisterWorker |
新的worker註冊進來。註冊成功則發送RegisteredWorker消息給worker |
RegisterApplication |
註冊app,包含driver和描述信息等。從driver啓動spark task。 只有StandaloneAppClient會發送RegisterApplication消息。 |
schedule |
調度主方法 主要目的判斷是否有worker知足啓動executor的條件,若是知足,則發送LaunchExecutor消息給worker,同時發送ExecutorAdded消息給Driver。 |
ExecutorStateChanged |
executor狀態更新,有多是executor中止,須要維護狀態,並判斷application狀態是否要更新,通知Driver該事件。 |
DriverStateChanged |
判斷是否調用RemoveDriver方法 |
Heartbeat |
判斷是否從新註冊worker |
RequestSubmitDriver |
建立Driver,調用schedule開始調度。由deploy的submit過程提交該消息。 |
RequestExecutors |
請求Executor,要通知指定的worker建立Executor? |
KillExecutors |
刪除Executor |
刪除Executor,從指定的worker刪除executor,同時通知消息給對應worker。
private def killExecutor(exec: ExecutorDesc): Unit = {
exec.worker.removeExecutor(exec)
exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
exec.state = ExecutorState.KILLED
}
註冊Worker,建立一個新的Worker對象,設置該Worker的cpus和內存,保存到master的數組中。
重要的調度函數。每次資源信息或者Executor狀態等更新時觸發該方法執行新的調度。
調度是指尋找等待的driver,爲每一個等待的driver尋找空閒cpu和內存數量都知足該driver需求的worker,而後啓動driver,向worker發送啓動driver消息在該worker上啓動driver,啓動worker上的executor。
爲每一個app,在可用的worker上分配executor數組。
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
向worker發送LaunchExecutor消息,同時向app的driver發送executorAdded消息。
注意有兩個分配:一是driver的啓動分配,須要判斷worker的cpu和內存是否知足條件;二是executor的分配,也要判斷worker的剩餘cpu和內存是否知足。