1. 前言
Flink提供了兩種在yarn上運行的模式,分別爲Session-Cluster和Per-Job-Cluster模式,本文分析兩種模式及啓動流程。html
下圖展現了Flink-On-Yarn模式下涉及到的相關類圖結構session


2. Session-Cluster模式
Session-Cluster模式須要先啓動集羣,而後再提交做業,接着會向yarn申請一塊空間後,資源永遠保持不變。若是資源滿了,下一個做業就沒法提交,只能等到yarn中的其中一個做業執行完成後,釋放了資源,下個做業纔會正常提交。全部做業共享Dispatcher和ResourceManager;共享資源;適合規模小執行時間短的做業。app

2.1. 啓動集羣
運行bin/yarn-session.sh
便可默認啓動包含一個TaskManager(內存大小爲1024MB,包含一個Slot)、一個JobMaster(內存大小爲1024MB),固然能夠經過指定參數控制集羣的資源,如**-n指定TaskManager個數,-s**指定每一個TaskManager中Slot的個數;其餘配置項,可參考spa
下面以bin/yarn-session.sh爲例,分析Session-Cluster啓動流程。rest
2.2. 流程分析
下面分爲本地和遠程分析啓動流程,其中本地表示在客戶端的啓動流程,遠端則表示經過Yarn拉起Container的流程;code
2.2.1 本地流程
- Session啓動入口爲FlinkYarnSessionCli#main
- 根據傳入的參數肯定集羣的資源信息(如多少個TaskManager,Slot等)
- 部署集羣AbstractYarnClusterDescriptor#deploySessionCluster -> AbstractYarnClusterDescriptor#deployInternal
- 進行資源校驗(如內存大小、vcore大小、隊列)
- 經過YarnClient建立Application
- 再次校驗資源
- AbstractYarnClusterDescriptor#startAppMaster啓動AppMaster
- 初始化文件系統(HDFS)
- 將log4j、logback、flink-conf.yaml、jar包上傳至HDFS
- 構造AppMaster的Container(肯定Container進程的入口類YarnSessionClusterEntrypoint),構造相應的Env
- YarnClient向Yarn提交Container申請
- 跟蹤ApplicationReport狀態(肯定是否啓動成功,可能會因爲資源不夠,一直等待)
- 啓動成功後將對應的ip和port寫入flinkConfiguration中
- 建立與將集羣交互的ClusterClient
- 根據flink-conf的HA配置建立對應的服務(如StandaloneHaServices、ZooKeeperHaServices等)
- 建立基於Netty的RestClient;
- 建立/rest_server_lock、/dispatcher_lock節點(以ZK爲例)
- 啓動監聽節點的變化(主備切換)
- 經過ClusterClient獲取到appId信息並寫入本地臨時文件
通過上述步驟,整個客戶端的啓動流程就結束了,下面分析yarn拉起Session集羣的流程,入口類在申請Container時指定爲YarnSessionClusterEntrypoint。server
2.2.2 遠端流程
2.3. 啓動任務
當啓動集羣后,便可使用./flink run -c mainClass /path/to/user/jar
向集羣提交任務。
2.4 流程分析
一樣,下面分爲本地和遠程分析啓動流程,其中本地表示在客戶端提交任務流程,遠端則表示集羣收到任務後的處理流程。
2.4.1 本地流程
- 程序入口爲CliFrontend#main
- 解析處理參數
- 根據用戶jar、main、程序參數、savepoint信息生成PackagedProgram
- 獲取session集羣信息
- 執行用戶程序
- 設置ClassLoader
- 設置Context
- 執行用戶程序main方法(當執行用戶業務邏輯代碼時,會解析出StreamGraph而後經過ClusterClient#run來提交任務),其流程以下:
- 獲取任務的JobGraph
- 經過RestClusterClient#submitJob提交任務
- 建立本地臨時文件存儲JobGraph
- 經過RestClusterClient向集羣的rest接口提交任務
- 處理請求響應結果
- 重置Context
- 重置ClassLoader
通過上述步驟,客戶端提交任務過程就完成了,主要就是經過RestClusterClient將用戶程序的JobGraph經過Rest接口提交至集羣中。
2.4.2 遠端流程
遠端響應任務提交請求的是RestServerEndpoint,其包含了多個Handler,其中JobSubmitHandler用來處理任務提交的請求;
- 處理請求入口:JobSubmitHandler#handleRequest
- 進行相關校驗
- 從文件中讀取出JobGraph
- 經過BlobClient將jar及JobGraph文件上傳至BlobServer中
- 經過Dispatcher#submitJob提交JobGraph
- 經過Dispatcher#runJob運行任務
- 建立JobManagerRunner(處理leader選舉)
- 建立JobMaster(實際執行任務入口,包含在JobManagerRunner)
- 啓動JobManagerRunner(會進行leader選舉,ZK目錄爲leader/${jobId}/job_manager_lock)
- 當爲主時會調用JobManagerRunner#grantLeadership方法
- 啓動JobMaster
- 將任務運行狀態信息寫入ZK(/${AppID}/running_job_registry/${jobId})
- 啓動JobMaster的Endpoint
- 開始調度任務JobMaster#startJobExecution
接下來就進行任務具體調度(構造ExecutionGraph、申請Slot等)流程,本篇文章再也不展開介紹。
3. Per-Job-Cluster模式
一個任務會對應一個Job,每提交一個做業會根據自身的狀況,都會單獨向yarn申請資源,直到做業執行完成,一個做業的失敗與否並不會影響下一個做業的正常提交和運行。獨享Dispatcher和ResourceManager,按需接受資源申請;適合規模大長時間運行的做業。

3.1 啓動任務
啓動Per-Job-Cluster任務,可經過./bin/flink run -m yarn-cluster -d -c mainClass /path/to/user/jar
命令使用分離模式啓動一個集羣,即單任務單集羣;
3.2. 流程分析
與Session-Cluster相似,咱們對Per-Job-Cluster模式也分爲本地和遠端。
3.2.1 本地流程
- 與Session-Cluster模式相似,入口也爲CliFrontend#main
- 解析處理參數
- 根據用戶jar、main、程序參數、savepoint信息生成PackagedProgram
- 根據PackagedProgram建立JobGraph(對於非分離模式仍是和Session模式同樣,模式Session-Cluster)
- 獲取集羣資源信息
- 部署集羣YarnClusterDesriptor#deployJobCluster -> AbstractYarnClusterDescriptor#deployInternal;後面流程與Session-Cluster相似,值得注意的是在AbstractYarnClusterDescriptor#startAppMaster中與Session-Cluster有一個顯著不一樣的就是其會將任務的JobGraph上傳至Hdfs供後續服務端使用
通過上述步驟,客戶端提交任務過程就完成了,主要涉及到文件(JobGraph和jar包)的上傳。
3.2.2 遠端流程
- 遠端宿主在Container中的集羣入口爲YarnJobClusterEntrypoint#main
- ClusterEntrypoint#runClusterEntrypoint -> ClusterEntrypoint#startCluster啓動集羣
- 建立JobDispatcherResourceManagerComponentFactory(用於建立JobDispatcherResourceManagerComponent)
- 建立ResourceManager(YarnResourceManager)、Dispatcher(MiniDispatcher),其中在建立MiniDispatcher時會從以前的JobGraph文件中讀取出JobGraph,並啓動進行ZK選舉
- 當爲主時會調用Dispatcher#grantLeadership方法
- Dispatcher#recoverJobs恢復任務,獲取JobGraph
- Dispatcher#tryAcceptLeadershipAndRunJobs確認獲取主並開始運行任務
- Dispatcher#runJob開始運行任務(建立JobManagerRunner並啓動進行ZK選舉),後續流程與Session-Cluster相同,再也不贅述
4. 總結
Flink提供在Yarn上兩種運行模式:Session-Cluster和Per-Job-Cluster,其中Session-Cluster的資源在啓動集羣時就定義完成,後續全部做業的提交都共享該資源,做業可能會互相影響,所以比較適合小規模短期運行的做業,對於Per-Job-Cluster而言,全部做業的提交都是單獨的集羣,做業之間的運行不受影響(可能會共享CPU計算資源),所以比較適合大規模長時間運行的做業。
原文出處:https://www.cnblogs.com/leesf456/p/11136344.html