Flink on YARN(上):一張圖輕鬆掌握基礎架構與啓動流程

做者:楊弢(搏遠)java

Flink 支持 Standalone 獨立部署和 YARN、Kubernetes、Mesos 等集羣部署模式,其中 YARN 集羣部署模式在國內的應用愈來愈普遍。Flink 社區將推出 Flink on YARN 應用解讀系列文章,分爲上、下兩篇。本文基於 FLIP-6 重構後的資源調度模型將介紹 Flink on YARN 應用啓動全流程,並進行詳細步驟解析。下篇將根據社區大羣反饋,解答客戶端和Flink Cluster的常見問題,分享相關問題的排查思路。node

Flink on YARN 流程圖

Flink on YARN集羣部署模式涉及YARN和Flink兩大開源框架,應用啓動流程的不少環節交織在一塊兒,爲了便於你們理解,在一張圖上畫出了Flink on YARN基礎架構和應用啓動全流程,並對關鍵角色和流程進行了介紹說明,整個啓動流程又被劃分紅客戶端提交(流程標註爲紫色)、Flink Cluster啓動和Job提交運行(流程標註爲橙色)兩個階段分別闡述,因爲分支和細節太多,本文會忽略掉一些,只介紹關鍵流程(基於Flink開源1.9版本源碼整理)。git

Flink on YARN 全流程圖.png

客戶端提交流程

1.執行命令:bin/flink run -d -m yarn-cluster ...或bin/yarn-session.sh ...來提交per-job運行模式或session運行模式的應用;github

2.解析命令參數項並初始化,啓動指定運行模式,若是是per-job運行模式將根據命令行參數指定的Job主類建立job graph;apache

  • 若是能夠從命令行參數(-yid <APPLICATION_ID>)或YARN properties臨時文件({java.io.tmpdir}/.yarn-properties-{user.name})中獲取應用ID,向指定的應用提交Job;
  • 不然當命令行參數中包含 -d(表示detached模式)和 -m yarn-cluster(表示指定YARN集羣模式),啓動per-job運行模式;
  • 不然當命令行參數項不包含 -yq(表示查詢YARN集羣可用資源)時,啓動session運行模式;

3.獲取YARN集羣信息、新應用ID並啓動運行前檢查;微信

  • 經過YarnClient向YARN ResourceManager(下文縮寫爲:YARN RM,YARN Master節點,負責整個集羣資源的管理和調度)請求建立一個新應用(YARN RM收到建立應用請求後生成新應用ID和container申請的資源上限後返回),而且獲取YARN Slave節點報告(YARN RM返回所有slave節點的ID、狀態、rack、http地址、總資源、已使用資源等信息);session

  • 運行前檢查:(1) 簡單驗證YARN集羣可否訪問;(2) 最大node資源可否知足flink JobManager/TaskManager vcores資源申請需求;(3) 指定queue是否存在(不存在也只是打印WARN信息,後續向YARN提交時排除異常並退出);(4)當預期應用申請的Container資源會超出YARN資源限制時拋出異常並退出;(5) 當預期應用申請不能被知足時(例如總資源超出YARN集羣可用資源總量、Container申請資源超出NM可用資源最大值等)提供一些參考信息。數據結構

4.將應用配置(flink-conf.yaml、logback.xml、log4j.properties)和相關文件(flink jars、ship files、user jars、job graph等)上傳至分佈式存儲(例如HDFS)的應用暫存目錄(/user/${user.name}/.flink/);架構

5.準備應用提交上下文(ApplicationSubmissionContext,包括應用的名稱、類型、隊列、標籤等信息和應用Master的container的環境變量、classpath、資源大小等),註冊處理部署失敗的shutdown hook(清理應用對應的HDFS目錄),而後經過YarnClient向YARN RM提交應用;app

6.循環等待直到應用狀態爲RUNNING,包含兩個階段:

  • 循環等待應用提交成功(SUBMITTED):默認每隔200ms經過YarnClient獲取應用報告,若是應用狀態不是NEW和NEW_SAVING則認爲提交成功並退出循環,每循環10次會將當前的應用狀態輸出至日誌:"Application submission is not finished, submitted application <APPLICATION_ID> is still in <APP_STATE>",提交成功後輸出日誌:"Submitted application <APPLICATION_ID>"

  • 循環等待應用正常運行(RUNNING):每隔250ms經過YarnClient獲取應用報告,每輪循環也會將當前的應用狀態輸出至日誌:"Deploying cluster, current state <APP_STATE>"。應用狀態成功變爲RUNNING後將輸出日誌"YARN application has been deployed successfully." 並退出循環,若是等到的是非預期狀態如FAILED/FINISHED/KILLED,就會在輸出YARN返回的診斷信息("The YARN application unexpectedly switched to state <APP_STATE> during deployment. Diagnostics from YARN: ...")以後拋出異常並退出。

Flink Cluster啓動流程

1.YARN RM中的ClientRMService(爲普通用戶提供的RPC服務組件,處理來自客戶端的各類RPC請求,好比查詢YARN集羣信息,提交、終止應用等)接收到應用提交請求,簡單校驗後將請求轉交給RMAppManager(YARN RM內部管理應用生命週期的組件);

2.RMAppManager根據應用提交上下文內容建立初始狀態爲NEW的應用,將應用狀態持久化到RM狀態存儲服務(例如ZooKeeper集羣,RM狀態存儲服務用來保證RM重啓、HA切換或發生故障後集羣應用可以正常恢復,後續流程中的涉及狀態存儲時再也不贅述),應用狀態變爲NEW_SAVING;

**3.應用狀態存儲完成後,應用狀態變爲SUBMITTED;**RMAppManager開始向ResourceScheduler(YARN RM可拔插資源調度器,YARN自帶三種調度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最普遍,FifoScheduler功能最簡單基本不可用,今年社區已明確再也不繼續支持FairScheduler,建議已有用戶遷至CapacityScheduler)提交應用,若是沒法正常提交(例如隊列不存在、不是葉子隊列、隊列已停用、超出隊列最大應用數限制等)則拋出拒絕該應用,應用狀態先變爲FINAL_SAVING觸發應用狀態存儲流程並在完成後變爲FAILED;若是提交成功,應用狀態變爲ACCEPTED;

4.開始建立應用運行實例(ApplicationAttempt,因爲一次運行實例中最重要的組件是ApplicationMaster,下文簡稱AM,它的狀態表明瞭ApplicationAttempt的當前狀態,因此ApplicationAttempt實際也表明了AM),初始狀態爲NEW;

**5.初始化應用運行實例信息,**並向ApplicationMasterService(AM&RM協議接口服務,處理來自AM的請求,主要包括註冊和心跳)註冊,應用實例狀態變爲SUBMITTED;

6.RMAppManager維護的應用實例開始初始化AM資源申請信息並從新校驗隊列,而後向ResourceScheduler申請AM Container(Container是YARN中資源的抽象,包含了內存、CPU等多維度資源),應用實例狀態變爲ACCEPTED;

7.ResourceScheduler會根據優先級(隊列/應用/請求每一個維度都有優先級配置)從根隊列開始層層遞進,前後選擇當前優先級最高的子隊列、應用直至具體某個請求,而後結合集羣資源分佈等狀況做出分配決策,AM Container分配成功後,應用實例狀態變爲ALLOCATED_SAVING,並觸發應用實例狀態存儲流程,存儲成功後應用實例狀態變爲ALLOCATED;

8.RMAppManager維護的應用實例開始通知ApplicationMasterLauncher(AM生命週期管理服務,負責啓動或清理AM container)啓動AM container,ApplicationMasterLauncher與YARN NodeManager(下文簡稱YARN NM,與YARN RM保持通訊,負責管理單個節點上的所有資源、Container生命週期、附屬服務等,監控節點健康情況和Container資源使用)創建通訊並請求啓動AM container;

9.ContainerManager(YARN NM核心組件,管理全部Container的生命週期)接收到AM container啓動請求,YARN NM開始校驗Container Token及資源文件,建立應用實例和Container實例並存儲至本地,結果返回後應用實例狀態變爲LAUNCHED;

10.ResourceLocalizationService(資源本地化服務,負責Container所需資源的本地化。它可以按照描述從HDFS上下載Container所需的文件資源,並儘可能將它們分攤到各個磁盤上以防止出現訪問熱點)初始化各類服務組件、建立工做目錄、從HDFS下載運行所需的各類資源至Container工做目錄(路徑爲: {yarn.nodemanager.local-dirs}/usercache/{user}/appcache/<APPLICATION_ID>/<CONTAINER_ID>);

11.ContainersLauncher(負責container的具體操做,包括啓動、重啓、恢復和清理等)將待運行Container所需的環境變量和運行命令寫到Container工做目錄下的launch_container.sh腳本中,而後運行該腳本啓動Container;

12.Container進程加載並運行ClusterEntrypoint(Flink JobManager入口類,每種集羣部署模式和應用運行模式都有相應的實現,例如在YARN集羣部署模式下,per-job應用運行模式實現類是YarnJobClusterEntrypoint,session應用運行模式實現類是YarnSessionClusterEntrypoint),首先初始化相關運行環境:

  • 輸出各軟件版本及運行環境信息、命令行參數項、classpath等信息;
  • 註冊處理各類SIGNAL的handler:記錄到日誌
  • 註冊JVM關閉保障的shutdown hook:避免JVM退出時被其餘shutdown hook阻塞
  • 打印YARN運行環境信息:用戶名
  • 從運行目錄中加載flink conf
  • 初始化文件系統
  • 建立並啓動各種內部服務(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)
  • 將RPC address和port更新到flink conf配置

13.啓動ResourceManager(Flink資源管理核心組件,包含YarnResourceManager和SlotManager兩個子組件,YarnResourceManager負責外部資源管理,與YARN RM創建通訊並保持心跳,申請或釋放TaskManager資源,註銷應用等;SlotManager則負責內部資源管理,維護所有Slot信息和狀態)及相關服務,建立異步AMRMClient,開始註冊AM,註冊成功後每隔一段時間(心跳間隔配置項:${yarn.heartbeat.interval},默認5s)向YARN RM發送心跳來發送資源更新請求和接受資源變動結果。YARN RM內部該應用和應用運行實例的狀態都變爲RUNNING,並通知AMLivelinessMonitor服務監控AM是否存活狀態,小心跳超過必定時間(默認10分鐘)觸發AM failover流程;

14.啓動Dispatcher(負責接收用戶提供的做業,而且負責爲這個新提交的做業拉起一個新的 JobManager)及相關服務(包括REST endpoint等),在per-job運行模式下,Dispatcher將直接從Container工做目錄加載JobGraph文件;在session運行模式下,Dispatcher將在接收客戶端提交的Job(_經過BlockServer接收job graph文件)後再進行後續流程;

15.根據JobGraph啓動JobManager(負責做業調度、管理Job和Task的生命週期),構建ExecutionGraph(JobGraph的並行化版本,調度層最核心的數據結構);

16.JobManager開始執行ExecutionGraph,向ResourceManager申請資源;

17.ResourceManager將資源請求加入等待請求隊列,並經過心跳向YARN RM申請新的Container資源來啓動TaskManager進程;後續流程若是有空閒Slot資源,SlotManager將其分配給等待請求隊列中匹配的請求,不用再經過18. YarnResourceManager申請新的TaskManager;

**18.YARN ApplicationMasterService接收到資源請求後,解析出新的資源請求並更新應用請求信息; **

19.YARN ResourceScheduler成功爲該應用分配資源後更新應用信息,ApplicationMasterService接收到Flink JobManager的下一次心跳時返回新分配資源信息;

20.Flink ResourceManager接收到新分配的Container資源後,準備好TaskManager啓動上下文(ContainerLauncherContext,生成TaskManager配置並上傳至分佈式存儲,配置其餘依賴和環境變量等),而後向YARN NM申請啓動TaskManager進程,YARN NM啓動Container的流程與AM Container啓動流程基本相似,區別在於應用實例在NM上已存在並未RUNNING狀態時則跳過應用實例初始化流程,這裏再也不贅述;

21.TaskManager進程加載並運行YarnTaskExecutorRunner(Flink TaskManager入口類),初始化流程完成後啓動TaskExecutor(負責執行Task相關操做);

22.TaskExecutor啓動後先向ResourceManager註冊,成功後再向SlotManager彙報本身的Slot資源與狀態; SlotManager接收到Slot空閒資源後主動觸發Slot分配,從等待請求隊列中選出合適的資源請求後,向 TaskManager請求該Slot資源

23.TaskManager收到請求後檢查該Slot是否可分配(不存在則返回異常信息)、Job是否已註冊(沒有則先註冊再分配Slot),檢查經過後將Slot分配給JobManager;

24.JobManager檢查Slot分配是否重複,經過後通知Execution執行部署task流程,向TaskExecutor提交task; TaskExecutor啓動新的線程運行Task。

參考資料

Flink Release-1.9 SourceCode Flink Release-1.9 Documents FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc. YARN 3.2 SourceCode YARN 3.2.0 Documents


▼ Apache Flink 社區推薦 ▼

Apache Flink 及大數據領域盛會 Flink Forward Asia 2019 將於 11月28-30日在北京舉辦,阿里、騰訊、美團、字節跳動、百度、英特爾、DellEMC、Lyft、Netflix 及 Flink 創始團隊等近 30 家知名企業資深技術專家齊聚國際會議中心,與全球開發者共同探討大數據時代核心技術與開源生態。瞭解更多精彩議程請點擊:

developer.aliyun.com/special/ffa…

Flink 社區公衆號後臺回覆「門票」,少許免費門票搶先拿。

Flink 社區官方微信公衆號

Ververica公衆號二維碼.jpg
相關文章
相關標籤/搜索