Flink 支持 Standalone 獨立部署和 YARN、Kubernetes、Mesos 等集羣部署模式,其中 YARN 集羣部署模式在國內的應用愈來愈普遍。Flink 社區將推出 Flink on YARN 應用解讀系列文章,分爲上、下兩篇。本文基於 FLIP-6 重構後的資源調度模型將介紹 Flink on YARN 應用啓動全流程,並進行詳細步驟解析。下篇將根據社區大羣反饋,解答客戶端和Flink Cluster的常見問題,分享相關問題的排查思路。java
Flink on YARN集羣部署模式涉及YARN和Flink兩大開源框架,應用啓動流程的不少環節交織在一塊兒,爲了便於你們理解,在一張圖上畫出了Flink on YARN基礎架構和應用啓動全流程,並對關鍵角色和流程進行了介紹說明,整個啓動流程又被劃分紅客戶端提交(流程標註爲紫色)、Flink Cluster啓動和Job提交運行(流程標註爲橙色)兩個階段分別闡述,因爲分支和細節太多,本文會忽略掉一些,只介紹關鍵流程(基於Flink開源1.9版本源碼整理)。node
1.執行命令:bin/flink run -d -m yarn-cluster ...或bin/yarn-session.sh ...來提交per-job運行模式或session運行模式的應用;session
2.解析命令參數項並初始化,啓動指定運行模式,若是是per-job運行模式將根據命令行參數指定的Job主類建立job graph;數據結構
3.獲取YARN集羣信息、新應用ID並啓動運行前檢查;架構
4.將應用配置(flink-conf.yaml、logback.xml、log4j.properties)和相關文件(flink jars、ship files、user jars、job graph等)上傳至分佈式存儲(例如HDFS)的應用暫存目錄(/user/${user.name}/.flink/);app
5.準備應用提交上下文(ApplicationSubmissionContext,包括應用的名稱、類型、隊列、標籤等信息和應用Master的container的環境變量、classpath、資源大小等),註冊處理部署失敗的shutdown hook(清理應用對應的HDFS目錄),而後經過YarnClient向YARN RM提交應用;框架
6.循環等待直到應用狀態爲RUNNING,包含兩個階段:異步
1.YARN RM中的ClientRMService(爲普通用戶提供的RPC服務組件,處理來自客戶端的各類RPC請求,好比查詢YARN集羣信息,提交、終止應用等)接收到應用提交請求,簡單校驗後將請求轉交給RMAppManager(YARN RM內部管理應用生命週期的組件);分佈式
2.RMAppManager根據應用提交上下文內容建立初始狀態爲NEW的應用,將應用狀態持久化到RM狀態存儲服務(例如ZooKeeper集羣,RM狀態存儲服務用來保證RM重啓、HA切換或發生故障後集羣應用可以正常恢復,後續流程中的涉及狀態存儲時再也不贅述),應用狀態變爲NEW_SAVING;spa
3.應用狀態存儲完成後,應用狀態變爲SUBMITTED;RMAppManager開始向ResourceScheduler(YARN RM可拔插資源調度器,YARN自帶三種調度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最普遍,FifoScheduler功能最簡單基本不可用,今年社區已明確再也不繼續支持FairScheduler,建議已有用戶遷至CapacityScheduler)提交應用,若是沒法正常提交(例如隊列不存在、不是葉子隊列、隊列已停用、超出隊列最大應用數限制等)則拋出拒絕該應用,應用狀態先變爲FINAL_SAVING觸發應用狀態存儲流程並在完成後變爲FAILED;若是提交成功,應用狀態變爲ACCEPTED;
4.開始建立應用運行實例(ApplicationAttempt,因爲一次運行實例中最重要的組件是ApplicationMaster,下文簡稱AM,它的狀態表明瞭ApplicationAttempt的當前狀態,因此ApplicationAttempt實際也表明了AM),初始狀態爲NEW;
5.初始化應用運行實例信息,並向ApplicationMasterService(AM&RM協議接口服務,處理來自AM的請求,主要包括註冊和心跳)註冊,應用實例狀態變爲SUBMITTED;
6.RMAppManager維護的應用實例開始初始化AM資源申請信息並從新校驗隊列,而後向ResourceScheduler申請AM Container(Container是YARN中資源的抽象,包含了內存、CPU等多維度資源),應用實例狀態變爲ACCEPTED;
7.ResourceScheduler會根據優先級(隊列/應用/請求每一個維度都有優先級配置)從根隊列開始層層遞進,前後選擇當前優先級最高的子隊列、應用直至具體某個請求,而後結合集羣資源分佈等狀況做出分配決策,AM Container分配成功後,應用實例狀態變爲ALLOCATED_SAVING,並觸發應用實例狀態存儲流程,存儲成功後應用實例狀態變爲ALLOCATED;
8.RMAppManager維護的應用實例開始通知ApplicationMasterLauncher(AM生命週期管理服務,負責啓動或清理AM container)啓動AM container,ApplicationMasterLauncher與YARN NodeManager(下文簡稱YARN NM,與YARN RM保持通訊,負責管理單個節點上的所有資源、Container生命週期、附屬服務等,監控節點健康情況和Container資源使用)創建通訊並請求啓動AM container;
9.ContainerManager(YARN NM核心組件,管理全部Container的生命週期)接收到AM container啓動請求,YARN NM開始校驗Container Token及資源文件,建立應用實例和Container實例並存儲至本地,結果返回後應用實例狀態變爲LAUNCHED;
10.ResourceLocalizationService(資源本地化服務,負責Container所需資源的本地化。它可以按照描述從HDFS上下載Container所需的文件資源,並儘可能將它們分攤到各個磁盤上以防止出現訪問熱點)初始化各類服務組件、建立工做目錄、從HDFS下載運行所需的各類資源至Container工做目錄(路徑爲: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);
11.ContainersLauncher(負責container的具體操做,包括啓動、重啓、恢復和清理等)將待運行Container所需的環境變量和運行命令寫到Container工做目錄下的launch_container.sh腳本中,而後運行該腳本啓動Container;
12.Container進程加載並運行ClusterEntrypoint(Flink JobManager入口類,每種集羣部署模式和應用運行模式都有相應的實現,例如在YARN集羣部署模式下,per-job應用運行模式實現類是YarnJobClusterEntrypoint,session應用運行模式實現類是YarnSessionClusterEntrypoint),首先初始化相關運行環境:
13.啓動ResourceManager(Flink資源管理核心組件,包含YarnResourceManager和SlotManager兩個子組件,YarnResourceManager負責外部資源管理,與YARN RM創建通訊並保持心跳,申請或釋放TaskManager資源,註銷應用等;SlotManager則負責內部資源管理,維護所有Slot信息和狀態)及相關服務,建立異步AMRMClient,開始註冊AM,註冊成功後每隔一段時間(心跳間隔配置項:${yarn.heartbeat.interval},默認5s)向YARN RM發送心跳來發送資源更新請求和接受資源變動結果。YARN RM內部該應用和應用運行實例的狀態都變爲RUNNING,並通知AMLivelinessMonitor服務監控AM是否存活狀態,小心跳超過必定時間(默認10分鐘)觸發AM failover流程;
14.啓動Dispatcher(負責接收用戶提供的做業,而且負責爲這個新提交的做業拉起一個新的 JobManager)及相關服務(包括REST endpoint等),在per-job運行模式下,Dispatcher將直接從Container工做目錄加載JobGraph文件;在session運行模式下,Dispatcher將在接收客戶端提交的Job(_經過BlockServer接收job graph文件)後再進行後續流程;
15.根據JobGraph啓動JobManager(負責做業調度、管理Job和Task的生命週期),構建ExecutionGraph(JobGraph的並行化版本,調度層最核心的數據結構);
16.JobManager開始執行ExecutionGraph,向ResourceManager申請資源;
17.ResourceManager將資源請求加入等待請求隊列,並經過心跳向YARN RM申請新的Container資源來啓動TaskManager進程;後續流程若是有空閒Slot資源,SlotManager將其分配給等待請求隊列中匹配的請求,不用再經過18. YarnResourceManager申請新的TaskManager;
18.YARN ApplicationMasterService接收到資源請求後,解析出新的資源請求並更新應用請求信息;
19.YARN ResourceScheduler成功爲該應用分配資源後更新應用信息,ApplicationMasterService接收到Flink JobManager的下一次心跳時返回新分配資源信息;
20.Flink ResourceManager接收到新分配的Container資源後,準備好TaskManager啓動上下文(ContainerLauncherContext,生成TaskManager配置並上傳至分佈式存儲,配置其餘依賴和環境變量等),而後向YARN NM申請啓動TaskManager進程,YARN NM啓動Container的流程與AM Container啓動流程基本相似,區別在於應用實例在NM上已存在並未RUNNING狀態時則跳過應用實例初始化流程,這裏再也不贅述;
21.TaskManager進程加載並運行YarnTaskExecutorRunner(Flink TaskManager入口類),初始化流程完成後啓動TaskExecutor(負責執行Task相關操做);
22.TaskExecutor啓動後先向ResourceManager註冊,成功後再向SlotManager彙報本身的Slot資源與狀態;
SlotManager接收到Slot空閒資源後主動觸發Slot分配,從等待請求隊列中選出合適的資源請求後,向
TaskManager請求該Slot資源
23.TaskManager收到請求後檢查該Slot是否可分配(不存在則返回異常信息)、Job是否已註冊(沒有則先註冊再分配Slot),檢查經過後將Slot分配給JobManager;
24.JobManager檢查Slot分配是否重複,經過後通知Execution執行部署task流程,向TaskExecutor提交task;
TaskExecutor啓動新的線程運行Task。
本文爲雲棲社區原創內容,未經容許不得轉載。