數棧技術分享:一文帶你瞭解Flink jm、tm啓動過程和資源分配

1、JM啓動過程

一、從日誌角度分析啓動流程git

1)client生成jobGraphgithub

詳情請參考:
https://www.bilibili.com/video/BV13K4y1P7ri

2)Yarn RM接收到請求(和yarn交互不重點分析)apache

3)在被分配的節點上的工做目錄下啓動launch_container.sh後端

4)在perJob模式下,最終調用的是YarnJobClusterEntrypoint併發

5)初始化相關運行環境,打印軟件版本、運行環境、命令行參數、classpath 等信息分佈式

6)加載flink配置文件、初始化文件系統、啓動各類內部服務(RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等)

7)啓動Flink資源管理核心組件ResourceManager(包含 YarnResourceManager 和 SlotManager 兩個子組件)

8)啓動Dispatcher加載JobGraph 文件、並啓動JobManager

9)JobManager開始執行ExecutionGraph,向 ResourceManager申請資源ide

10)Flink ResourceManager 接收到新分配的 Container 資源後,準備好 TaskManager 啓動上下文

11)TaskManager 進程加載並運行 YarnTaskExecutorRunner(Flink TaskManager入口類),初始化流程完成後啓動 TaskExecutor(負責執行Task相關操做)

12)TaskExecutor向ResourceManager註冊,向SlotManager彙報本身的 Slot 資源與狀態

13)JobManager向TaskExecutor提交task,TaskExecutor啓動新的線程運行Task工具

二、總體流程分析url

1)輸出各軟件版本及運行環境信息、命令行參數項、classpath等信息
2)註冊處理各類SIGNAL的handler:記錄到日誌
3)註冊JVM關閉保障的shutdown hook:避免JVM退出時被其餘shutdown hook阻塞
4)打印YARN運行環境信息:用戶名
5)從運行目錄中加載flink confspa

三、AM啓動過程

1)建立並啓動各種內部服務(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)

2)將RPC address和port更新到flink conf配置

3)建立並啓動resourceManager對象(Flink資源管理核心組件,包含YarnResourceManager和SlotManager兩個子組件,YarnResourceManager負責外部資源管理,與YARN RM創建通訊並保持心跳,申請或釋放TaskManager資源,註銷應用等;SlotManager則負責內部資源管理,維護所有Slot信息和狀態)

4)建立並啓動dispatcher(負責接收用戶提供的做業,而且負責爲這個新提交的做業拉起一個新的 JobManager)及相關服務(包括REST endpoint等)並加載JobGraph。

2、JM資源分配

JobManager開始執行ExecutionGraph,向ResourceManager申請資源。

ResourceManager將資源請求加入等待請求隊列,並經過心跳向YARN RM申請新的Container資源來啓動TaskManager進程。

後續流程若是有空閒Slot資源,SlotManager將其分配給等待請求隊列中匹配的請求,不用再經過YarnResourceManager申請新的TaskManager。

 

Flink ResourceManager接收到新分配的Container資源後,準備好TaskManager啓動上下文(ContainerLauncherContext,生成TaskManager配置並上傳至分佈式存儲,配置其餘依賴和環境變量等)。

而後向YARN NM申請啓動TaskManager進程,YARN NM啓動Container的流程與AM Container啓動流程基本相似。

3、TM啓動過程

輸出各軟件版本及運行環境信息、命令行參數項、classpath等信息

註冊處理各類SIGNAL的handler:記錄到日誌

註冊JVM關閉保障的shutdown hook:避免JVM退出時被其餘shutdown hook阻塞

加載flink配置文件、初始化文件系統、啓動各類內部服務(RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry等)

啓動tm後就能夠經過RPC接收遠程調用,submitTask就是接收任務的服務。

回到在JM端啓動scheduler後,就開始調度Execution,在Execution的deploy()方法中經過rpc調用TM的submitTask接口。

交互流程圖以下:

當submitTask收到請求後加載jobInformation和taskInformation文件,初始化jobInformation和taskInformation,而後構造Task,啓動Task線程,最終調用AbstractInvokable.invoke方法。

  • invokable.invoke( )將根據nameOfInvokableClass的不一樣調度不一樣的任務,包括批任務、Source任務、Sink任務、流任務
  • DataSourceTask:Kafka Source
  • StreamTask:中間算子
  • DataSinkTask:Kafka Sink

這裏以StreamTask例分析

  • 初始化、run、close
  • 初始化:建立狀態後端、operator配置、特殊task初始化、恢復算子的狀態、richfunction open
  • run:執行task,處理record併發往下游
  • close:關閉和清理操做

這裏以flinkX中的代碼爲例:

會被invoke()中的initialize-operator-states()執行並調用到DtInputFormatSourceFunction的initializeState方法恢復狀態。

這裏以flinkX中的代碼爲例:

會被invoke()中的open-operators()執行並調用到DtInputFormatSourceFunction的open方法恢復狀態作一些初始化工做。

這裏以flinkX中的代碼爲例:

會被invoke()中的run()執行並調用到DtInputFormatSourceFunction的run讀取數據並往下游發送。

通過上面分析,任務已經啓動,並等待數據流動。

相關參考:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

https://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdfhttps://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdf

https://zhuanlan.zhihu.com/p/87132673https://zhuanlan.zhihu.com/p/87132673

 

數棧是雲原生—站式數據中臺PaaS,咱們在github和gitee上有一個有趣的開源項目:FlinkXFlinkX是一個基於Flink的批流統一的數據同步工具,既能夠採集靜態的數據,也能夠採集實時變化的數據,是全域、異構、批流一體的數據同步引擎。你們喜歡的話請給咱們點個star!star!star!

github開源項目:https://github.com/DTStack/flinkx

gitee開源項目:https://gitee.com/dtstack_dev_0/flinkx

相關文章
相關標籤/搜索