【Flink】Flink做業調度流程分析

1. 概述

當向Flink集羣提交用戶做業時,從用戶角度看,只須要做業處理邏輯正確,輸出正確的結果便可;而不用關心做業什麼時候被調度的,做業申請的資源又是如何被分配的以及做業什麼時候會結束;可是瞭解做業在運行時的具體行爲對於咱們深刻了解Flink原理有很是大的幫助,而且對咱們如何編寫更合理的做業邏輯有指導意義,所以本文詳細分析做業的調度及資源分配以及做業的生命週期。html

2. 流程分析

基於社區master主線(1.11-SNAPSHOT),commit: 12f7873db54cfbc5bf853d66ccd4093f9b749c9a ,HA基於ZK實現分析web

Flink做業申請流程圖
上圖歸納了Flink做業從Client端提交到到Flink集羣的提交的基本流程[1]。緩存

當運行./flink run腳本提交用戶做業至Dispathcer後,Dispatcher會拉起JobManagerRunner,然後JobManagerRunner會向Zookeeper註冊競爭Leader。對於以前流程有興趣能夠參考深刻理解Flink-On-Yarn模式異步

JobManagerRunner競爭成爲Leader時,會調用JobManagerRunnerImpl#grantLeadership,此時便開始處理做業,會經過以下的代碼調用路徑啓動JobMaster。ide

  • JobManagerRunnerImpl#grantLeadership
  • JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager
  • JobManagerRunnerImpl#startJobMaster。
    startJobMaster方法會首先將該做業的ID寫入對應的ZK目錄並置爲RUNNING狀態,寫入該目錄可用於在Dispathcer接收做業時,判斷該做業是否重複提交或恢復做業時使用;在JobManagerRunner調度做業時也在從ZK上拉取做業信息來判斷做業狀態,若爲DONE狀態,則無需調度。啓動JobMaster時會先啓動其RPC Endpoint,以便與其餘組件進行RPC調用,以後JobMaster便經過JobMaster#startJobExecution開始執行做業,執行做業前會有些前置校驗,如必須確保運行在主線程中;啓動JobMaster上的一些服務(組件),如TaskManager和ResourceManager的心跳管理;啓動SlotPool、Scheduler;重連至ResourceManager,而且在ZK中註冊監聽ResourceManager Leader的變化的Retriever等。
    當初始化完JobMaster上相應服務(組件)後,便開始調度,會有以下代碼調用路徑
  • JobMaster#start
  • JobMaster#startJobExecution
  • JobMaster#resetAndStartScheduler
  • JobMaster#startScheduling
  • SchedulerBase#startScheduling。

咱們知道用戶編寫的做業是以JobGraph提交到Dispatcher,可是在實際調度時會將JobGraph轉化爲ExecutionGraph,JobGraph生成ExecutionGraph是在SchedulerBase對象初始化的時候完成轉化,以下圖所示表示了典型的轉化過程(JobVertex與ExecutionJobVertex一一對應),而具體的轉化邏輯實現可參考如何生成ExecutionGraph及物理執行圖oop

JobGraph->ExecutionGraph

在SchedulerBase初始化時生成ExecutionGraph後,以後便基於ExecutionGraph調度,而調度基類SchedulerBase默認實現爲DefaultScheduler,會繼續經過DefaultScheduler#startSchedulingInternal調度做業,此時會將做業(ExecutionGraph)的狀態從CREATED狀態變動爲RUNNING狀態,此時在Flink web界面查看任務的狀態便已經爲RUNNING,但注意此時做業(各頂點)實際並未開始調度,頂點仍是處於CREATED狀態,任做業狀態與頂點狀態不徹底相關聯,有其各自的演化生命週期,具體可參考Flink做業調度[2];而後根據不一樣的策略EagerSchedulingStrategy(主要用於流式做業,全部頂點(ExecutionVertex)同時開始調度)和LazyFromSourcesSchedulingStrategy(主要用於批做業,從Source開始開始調度,其餘頂點延遲調度)調度。ui

當提交流式做業時,會有以下代碼調用路徑:spa

  • EagerSchedulingStrategy#startScheduling
  • EagerSchedulingStrategy#allocateSlotsAndDeploy,在部署以前會根據待部署的ExecutionVertex生成對應的ExecutionVertexDeploymentOption,而後調用DefaultScheduler#allocateSlotsAndDeploy開始部署。一樣,在部署以前也須要進行一些前置校驗(ExecutionVertex對應的Execution的狀態必須爲CREATED),接着將待部署的ExecutionVertex對應的Execution狀態變動爲SCHEDULED,而後開始爲ExecutionVertex分配Slot。會有以下的調用代碼路徑:
  • DefaultScheduler#allocateSlots(該過程會ExecutionVertex轉化爲ExecutionVertexSchedulingRequirements,會封裝包含一些location信息、sharing信息、資源信息等)
  • DefaultExecutionSlotAllocator#allocateSlotsFor,該方法會開始逐一異步部署各ExecutionVertex,部署也是根據不一樣的Slot提供策略來分配,接着會通過以下代碼調用路徑層層轉發,SlotProviderStrategy#allocateSlot -> SlotProvider#allocateSlot(SlotProvider默認實現爲SchedulerImpl) -> SchedulerImpl#allocateSlotInternal -> SchedulerImpl#internalAllocateSlot(該方法會根據vertex是否共享slot來分配singleSlot/SharedSlot),以singleSlot爲例說明。
    在分配slot時,首先會在JobMaster中SlotPool中進行分配,具體是先SlotPool中獲取全部slot,而後嘗試選擇一個最合適的slot進行分配,這裏的選擇有兩種策略,即按照位置優先和按照以前已分配的slot優先;若從SlotPool沒法分配,則經過RPC請求向ResourceManager請求slot,若此時並未鏈接上ResourceManager,則會將請求緩存起來,待鏈接上ResourceManager後再申請。

當ResourceManager收到申請slot請求時,若發現該JobManager未註冊,則直接拋出異常;不然將請求轉發給SlotManager處理,SlotManager中維護了集羣全部空閒的slot(TaskManager會向ResourceManager上報本身的信息,在ResourceManager中由SlotManager保存Slot和TaskManager對應關係),並從其中找出符合條件的slot,而後向TaskManager發送RPC請求申請對應的slot。線程

等待全部的slot申請完成後,而後會將ExecutionVertex對應的Execution分配給對應的Slot,即從Slot中分配對應的資源給Execution,完成分配後可開始部署做業。
部署做業代碼調用路徑以下:3d

  • DefaultScheduler#waitForAllSlotsAndDeploy
  • DefaultScheduler#deployAll
  • DefaultScheduler#deployOrHandleError
  • DefaultScheduler#deployTaskSafe
  • DefaultExecutionVertexOperations#deploy
  • ExecutionVertex#deploy
  • Execution#deploy(每次調度ExecutionVertex,都會有一個Execute,在此階段會將Execution的狀態變動爲DEPLOYING狀態,而且爲該ExecutionVertex生成對應的部署描述信息,而後從對應的slot中獲取對應的TaskManagerGateway,以便向對應的TaskManager提交Task)
  • RpcTaskManagerGateway#submitTask(此時便將Task經過RPC提交給了TaskManager)。

TaskManager(TaskExecutor)在接收到提交Task的請求後,會通過一些初始化(如從BlobServer拉取文件,反序列化做業和Task信息、LibaryCacheManager等),而後這些初始化的信息會用於生成Task(Runnable對象),而後啓動該Task,其代碼調用路徑以下 Task#startTaskThread(啓動Task線程)-> Task#run(將ExecutionVertex狀態變動爲RUNNING狀態,此時在FLINK web前臺查看頂點狀態會變動爲RUNNING狀態,另外還會生成了一個AbstractInvokable對象,該對象是FLINK銜接執行用戶代碼的關鍵,然後會通過以下調用

  • AbstractInvokable#invoke(AbstractInvokable有幾個關鍵的子類實現, BatchTask/BoundedStreamTask/DataSinkTask/DataSourceTask/StreamTask/SourceStreamTask。對於streaming類型的Source,會調用StreamTask#invoke)
  • StreamTask#invoke
  • StreamTask#beforeInvoke
  • StreamTask#initializeStateAndOpen(初始化狀態和進行初始化,這裏會調用用戶的open方法(如自定義實現的source))-> StreamTask#runMailboxLoop,便開始處理Source端消費的數據,並流入下游算子處理。

至此做業從提交到資源分配及調度運行總體流程就已經分析完畢,對於流式做業而言,正常狀況下其會一直運行,不會結束。

3. 總結

對於做業的運行,會先提交至Dispatcher,由Dispatcher拉起JobManagerRunner,在JobManagerRunner成爲Leader後,便開始處理做業,首先會根據JobGraph生成對應的ExecutionGraph,而後開始調度,做業的狀態首先會變動爲RUNNING,而後對各ExecutionVertex申請slot,申請slot會涉及JM與RM、TM之間的通訊,當在TM上分配完slot後,即可將Task提交至TaskManager,而後TaskManager會爲每一個提交的Task生成一個單獨的線程處理。

參考

  1. https://www.infoq.cn/article/RWTM9o0SHHV3Xr8o8giT
  2. https://flink.sojb.cn/internals/job_scheduling.html
相關文章
相關標籤/搜索