【Flink】深刻理解Flink-On-Yarn模式

1. 前言

Flink提供了兩種在yarn上運行的模式,分別爲Session-Cluster和Per-Job-Cluster模式,本文分析兩種模式及啓動流程。html

下圖展現了Flink-On-Yarn模式下涉及到的相關類圖結構session

集羣啓動入口類

組件類圖結構

2. Session-Cluster模式

Session-Cluster模式須要先啓動集羣,而後再提交做業,接着會向yarn申請一塊空間後,資源永遠保持不變。若是資源滿了,下一個做業就沒法提交,只能等到yarn中的其中一個做業執行完成後,釋放了資源,下個做業纔會正常提交。全部做業共享Dispatcher和ResourceManager;共享資源;適合規模小執行時間短的做業。app

2.1. 啓動集羣

運行bin/yarn-session.sh便可默認啓動包含一個TaskManager(內存大小爲1024MB,包含一個Slot)、一個JobMaster(內存大小爲1024MB),固然能夠經過指定參數控制集羣的資源,如**-n指定TaskManager個數,-s**指定每一個TaskManager中Slot的個數;其餘配置項,可參考spa

下面以bin/yarn-session.sh爲例,分析Session-Cluster啓動流程。rest

2.2. 流程分析

下面分爲本地和遠程分析啓動流程,其中本地表示在客戶端的啓動流程,遠端則表示經過Yarn拉起Container的流程;code

2.2.1 本地流程

  • Session啓動入口爲FlinkYarnSessionCli#main
  • 根據傳入的參數肯定集羣的資源信息(如多少個TaskManager,Slot等)
  • 部署集羣AbstractYarnClusterDescriptor#deploySessionCluster -> AbstractYarnClusterDescriptor#deployInternal
    • 進行資源校驗(如內存大小、vcore大小、隊列)
    • 經過YarnClient建立Application
    • 再次校驗資源
    • AbstractYarnClusterDescriptor#startAppMaster啓動AppMaster
      • 初始化文件系統(HDFS)
      • 將log4j、logback、flink-conf.yaml、jar包上傳至HDFS
      • 構造AppMaster的Container(肯定Container進程的入口類YarnSessionClusterEntrypoint),構造相應的Env
      • YarnClient向Yarn提交Container申請
      • 跟蹤ApplicationReport狀態(肯定是否啓動成功,可能會因爲資源不夠,一直等待)
    • 啓動成功後將對應的ip和port寫入flinkConfiguration中
    • 建立與將集羣交互的ClusterClient
      • 根據flink-conf的HA配置建立對應的服務(如StandaloneHaServices、ZooKeeperHaServices等)
      • 建立基於Netty的RestClient;
      • 建立/rest_server_lock、/dispatcher_lock節點(以ZK爲例)
      • 啓動監聽節點的變化(主備切換)
    • 經過ClusterClient獲取到appId信息並寫入本地臨時文件

通過上述步驟,整個客戶端的啓動流程就結束了,下面分析yarn拉起Session集羣的流程,入口類在申請Container時指定爲YarnSessionClusterEntrypoint。server

2.2.2 遠端流程

  • 遠端宿主在Container中的集羣入口爲YarnSessionClusterEntrypoint#mainhtm

  • ClusterEntrypoint #runClusterEntrypoint -> ClusterEntrypoint#startCluster啓動集羣blog

    • 初始化文件系統
    • 初始化各類Service(如:建立RpcService(AkkaRpcService)、建立HAService、建立並啓動BlobServer、建立HeartbeatServices、建立指標服務並啓動、建立本地存儲ExecutionGraph的Store)
    • 建立DispatcherResourceManagerComponentFactory(SessionDispatcherResourceManagerComponentFactory),用於建立DispatcherResourceManagerComponent(用於啓動Dispatcher、ResourceManager、WebMonitorEndpoint)
    • 經過DispatcherResourceManagerComponentFactory建立DispatcherResourceManagerComponent
      • 建立/dispatcher_lock節點,/resource_manager_lock節點
      • 建立DispatcherGateway、ResourceManagerGateway的Retriever(用於建立RpcGateway)
      • 建立DispatcherGateway的WebMonitorEndpoint並啓動
      • 建立JobManager的指標組
      • 建立ResourceManager、Dispatcher並啓動進行ZK選舉
      • 返回SessionDispatcherResourceManagerComponent

    通過上述步驟就完成了集羣的啓動;接口

2.3. 啓動任務

當啓動集羣后,便可使用./flink run -c mainClass /path/to/user/jar向集羣提交任務。

2.4 流程分析

一樣,下面分爲本地和遠程分析啓動流程,其中本地表示在客戶端提交任務流程,遠端則表示集羣收到任務後的處理流程。

2.4.1 本地流程

  • 程序入口爲CliFrontend#main
  • 解析處理參數
  • 根據用戶jar、main、程序參數、savepoint信息生成PackagedProgram
  • 獲取session集羣信息
  • 執行用戶程序
    • 設置ClassLoader
    • 設置Context
    • 執行用戶程序main方法(當執行用戶業務邏輯代碼時,會解析出StreamGraph而後經過ClusterClient#run來提交任務),其流程以下:
      • 獲取任務的JobGraph
      • 經過RestClusterClient#submitJob提交任務
      • 建立本地臨時文件存儲JobGraph
      • 經過RestClusterClient向集羣的rest接口提交任務
      • 處理請求響應結果
    • 重置Context
    • 重置ClassLoader

通過上述步驟,客戶端提交任務過程就完成了,主要就是經過RestClusterClient將用戶程序的JobGraph經過Rest接口提交至集羣中。

2.4.2 遠端流程

遠端響應任務提交請求的是RestServerEndpoint,其包含了多個Handler,其中JobSubmitHandler用來處理任務提交的請求;

  • 處理請求入口:JobSubmitHandler#handleRequest
  • 進行相關校驗
  • 從文件中讀取出JobGraph
  • 經過BlobClient將jar及JobGraph文件上傳至BlobServer中
  • 經過Dispatcher#submitJob提交JobGraph
  • 經過Dispatcher#runJob運行任務
    • 建立JobManagerRunner(處理leader選舉)
    • 建立JobMaster(實際執行任務入口,包含在JobManagerRunner)
    • 啓動JobManagerRunner(會進行leader選舉,ZK目錄爲leader/${jobId}/job_manager_lock)
    • 當爲主時會調用JobManagerRunner#grantLeadership方法
      • 啓動JobMaster
      • 將任務運行狀態信息寫入ZK(/${AppID}/running_job_registry/${jobId})
      • 啓動JobMaster的Endpoint
      • 開始調度任務JobMaster#startJobExecution

接下來就進行任務具體調度(構造ExecutionGraph、申請Slot等)流程,本篇文章再也不展開介紹。

3. Per-Job-Cluster模式

一個任務會對應一個Job,每提交一個做業會根據自身的狀況,都會單獨向yarn申請資源,直到做業執行完成,一個做業的失敗與否並不會影響下一個做業的正常提交和運行。獨享Dispatcher和ResourceManager,按需接受資源申請;適合規模大長時間運行的做業。

3.1 啓動任務

啓動Per-Job-Cluster任務,可經過./bin/flink run -m yarn-cluster -d -c mainClass /path/to/user/jar命令使用分離模式啓動一個集羣,即單任務單集羣;

3.2. 流程分析

與Session-Cluster相似,咱們對Per-Job-Cluster模式也分爲本地和遠端。

3.2.1 本地流程

  • 與Session-Cluster模式相似,入口也爲CliFrontend#main
  • 解析處理參數
  • 根據用戶jar、main、程序參數、savepoint信息生成PackagedProgram
  • 根據PackagedProgram建立JobGraph(對於非分離模式仍是和Session模式同樣,模式Session-Cluster)
  • 獲取集羣資源信息
  • 部署集羣YarnClusterDesriptor#deployJobCluster -> AbstractYarnClusterDescriptor#deployInternal;後面流程與Session-Cluster相似,值得注意的是在AbstractYarnClusterDescriptor#startAppMaster中與Session-Cluster有一個顯著不一樣的就是其會將任務的JobGraph上傳至Hdfs供後續服務端使用

通過上述步驟,客戶端提交任務過程就完成了,主要涉及到文件(JobGraph和jar包)的上傳。

3.2.2 遠端流程

  • 遠端宿主在Container中的集羣入口爲YarnJobClusterEntrypoint#main
  • ClusterEntrypoint#runClusterEntrypoint -> ClusterEntrypoint#startCluster啓動集羣
  • 建立JobDispatcherResourceManagerComponentFactory(用於建立JobDispatcherResourceManagerComponent)
  • 建立ResourceManager(YarnResourceManager)、Dispatcher(MiniDispatcher),其中在建立MiniDispatcher時會從以前的JobGraph文件中讀取出JobGraph,並啓動進行ZK選舉
  • 當爲主時會調用Dispatcher#grantLeadership方法
    • Dispatcher#recoverJobs恢復任務,獲取JobGraph
    • Dispatcher#tryAcceptLeadershipAndRunJobs確認獲取主並開始運行任務
      • Dispatcher#runJob開始運行任務(建立JobManagerRunner並啓動進行ZK選舉),後續流程與Session-Cluster相同,再也不贅述

4. 總結

Flink提供在Yarn上兩種運行模式:Session-Cluster和Per-Job-Cluster,其中Session-Cluster的資源在啓動集羣時就定義完成,後續全部做業的提交都共享該資源,做業可能會互相影響,所以比較適合小規模短期運行的做業,對於Per-Job-Cluster而言,全部做業的提交都是單獨的集羣,做業之間的運行不受影響(可能會共享CPU計算資源),所以比較適合大規模長時間運行的做業。

原文出處:https://www.cnblogs.com/leesf456/p/11136344.html

相關文章
相關標籤/搜索