第1章 簡介
1.1 概要介紹
Flink on Yarn的HA高可用模式,首先依賴於Yarn自身的高可用機制(ResourceManager高可用),並經過Yarn對JobManager進行管理,當JobManager失效時,Yarn將從新啓動JobManager。其次Flink Job在恢復時,須要依賴Checkpoint進行恢復,而Checkpoint的快照依賴於遠端的存儲:HDFS,因此HDFS也必須是高可用,同時JobManager的元數據信息也依賴於HDFS的高可用(namenode的高可用,和多副本機制),再者JobManager元數據的指針信息要依賴於Zookeeper的高可用。本章重點介紹Flink自己的高可用,其餘框架的高可用請參考筆者以前的文章。node
1.2 Flink on Yarn的優點
相對於 Standalone 模式,在Yarn 模式下有如下幾點好處:apache
1.資源按需使用,提升集羣的資源利用率;後端
2.任務有優先級,根據優先級運行做業;bash
3.基於 Yarn 調度系統,可以自動化地處理各個角色的 Failover:session
JobManager 進程和 TaskManager 進程都由 Yarn NodeManager 監控;多線程
若是 JobManager 進程異常退出,則 Yarn ResourceManager 會從新調度 JobManager 到其餘機器;app
若是 TaskManager 進程異常退出,JobManager 會收到消息並從新向 Yarn ResourceManager 申請資源,從新啓動 TaskManager。框架
第2章 Flink on Yarn模式運行的方式
2.1 Per-Job
Per-Job模式:簡答的說就是直接run job,每次提交的任務Yarn都會分配一個JobManager,執行完以後整個資源會釋放,包括JobManager和TaskManager。jvm
Per-Job模式適合比較大的任務、執行時間比較長的任務。oop
2.2 Session
Session模式:在Session模式中, Dispatcher 和 ResourceManager 是能夠複用的;當執行完Job以後JobManager並不會釋放,Session 模式也稱爲多線程模式,其特色是資源會一直存在不會釋放。使用時先啓動yarn-session,而後再提交job,每次提交job,也都會分配一個JobManager。
Session模式適合比較小的任務、執行時間比較短的任務。該模式不用頻繁的申請資源和釋放資源。
注:本章先簡單的瞭解這兩種模式和使用上的區別,筆者會在後續的章節中對其原理進行較爲詳細的剖析。敬請期待!
第3章 集羣規劃
筆者是在原有的3臺機器名爲Hadoop10*的機器上安裝的Flink,因此這裏機器名都是Hadoop100、Hadoop10一、Hadoop102。
|
Hadoop100(Flink) |
Hadoop101(Flink) |
Hadoop102(Flink) |
JobManager |
√ |
√ |
√ |
TaskManager |
√ |
√ |
√ |
第4章 下載安裝
Flink官網:https://flink.apache.org/
你們到官網進行下載,先參考我以前的文章進行環境變量的配置:大數據實操篇 No.9-Flink Standalone模式部署及使用
Flink on Yarn部署使用前必定要先安裝好Zookeeper和Hadoop(HDFS和Yarn)。
第5章 部署和使用
5.1 修改Hadoop配置
5.1.1 修改yarn-site.xml
修改hadoop配置文件/etc/hadoop/yarn-site.xml,設置application master重啓時,嘗試的最大次數。
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>10</value> <description> The maximum number of application master execution attempts. </description> </property>
5.2 修改masters
修改conf目錄下masters文件:
hadoop100:8081 hadoop101:8081 hadoop102:8081
5.3 修改workers
修改conf目錄下workers文件:
hadoop100 hadoop101 hadoop102
接下來,筆者用2種模式單獨進行部署演示。
5.4 Per-Job模式
5.4.1 修改flink-conf.yaml
修改flink配置文件/conf/flink-conf.yaml
# jobmanager和taskmanager、其餘client的RPC通訊IP地址,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby jobmanager.rpc.address: localhost # jobmanager和taskmanager、其餘client的RPC通訊端口,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby jobmanager.rpc.port: 6123 # jobmanager JVM heap 內存大小 jobmanager.memory.process.size: 1024m # taskmanager JVM heap 內存大小 taskmanager.memory.process.size: 1024m # 每一個taskmanager提供的任務slots數量 # 並行度等於TM 數量乘以每一個TM 的Solts 數量 TM=並行度/Solts數量 若是slots數量大於8 則只會起一個TM taskmanager.numberOfTaskSlots: 3 # 並行計算個數 parallelism.default: 6 # 高可用模式 high-availability: zookeeper # JobManager元數據保留在文件系統storageDir中 指向此狀態的指針存儲在ZooKeeper中 high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/ # Zookeeper集羣 high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181 # 在zookeeper下的根目錄 high-availability.zookeeper.path.root: /flink_yarn # zookeeper節點下的集羣ID 該節點下放置了集羣所需的全部協調數據 多個flink集羣鏈接同一套zookeeper集羣須要配置各自不一樣的集羣ID,官方建議這個配置最好去掉,由於在 Yarn(以及Mesos)模式下,cluster-id 若是不配置的話,會配置成 Yarn 上的 Application ID ,從而能夠保證惟一性。 #high-availability.cluster-id: /default_yarn # 單個flink job重啓次數 必須小於等於yarn-site.xml中Application Master配置的嘗試次數 yarn.application-attempts: 10 #============================================================================== # Fault tolerance and checkpointing #============================================================================== # jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend) state.backend: rocksdb # 檢查點的默認目錄。Flink支持的文件系統中存儲檢查點的數據文件和元數據的默認目錄。必須從全部參與的進程/節點(即全部TaskManager和JobManager)訪問存儲路徑。 state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints # 保存點的默認目錄。由狀態後端用於將保存點寫入文件系統(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。 state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints # 是否應該建立增量檢查點。對於增量檢查點,只存儲前一個檢查點的差別,而不存儲完整的檢查點狀態。一些狀態後端可能不支持增量檢查點並忽略此選項。 state.backend.incremental: false # jobmanager故障恢復策略,指定做業計算如何從任務失敗中恢復 full從新啓動全部任務以恢復做業 region從新啓動可能受任務失敗影響的全部任務,region在目前(flink1.11)只對批處理有效,實時計算任然時full jobmanager.execution.failover-strategy: region # 全局檢查點的保留數量 state.checkpoints.num-retained: 3 # 本地恢復。默認狀況下,本地恢復處於禁用狀態。本地恢復當前僅涵蓋keyed state backends。當前,MemoryStateBackend不支持本地恢復。 state.backend.local-recovery: true # 存儲基於文件的狀態以進行本地恢復的根目錄。本地恢復當前僅涵蓋keyed state backends taskmanager.state.local.root-dirs: /opt/flink-tm-state
修改完全部配置後,注意將配置分發到其餘機器上。
注意:Hadoop中的yarn.resourcemanager.am.max-attemps和Flink中的yarn.application-attempts配置含義:
flink job失敗重啓次數,嘗試從新啓動9次(9次重試+ 1次初始嘗試)
flink job(在yarn中稱爲application)在Jobmanager(或者叫Application Master)恢復時,容許重啓的最大次數。
注意,Flink On Yarn環境中,當Jobmanager(Application Master)失敗時,yarn會嘗試重啓JobManager(AM),重啓後,會從新啓動Flink的Job(application)。所以,flink中的yarn.application-attempts配置不能超過yarn中的yarn.resourcemanager.am.max-attemps。
5.4.2 運行做業
Flink各版本的運行命令可能存在差別,能夠先經過-h查看幫助。
$ bin/flink run -h
運行Job
$ bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-job-example
5.4.3 查看運行狀況
資源已經受Yarn的管理,能夠直接從Yarn集羣的Web UI界面打開ApplicationMaster,從而進入到Flink的Web UI控制檯:
運行中的做業:
運行完成的做業:
運行完成後,JobManager和TaskManager就被釋放了,此時Yarn中顯示的運行結果以下:
Linux系統中顯示的運行信息:
5.5 Session模式
5.5.1 修改flink-conf.yaml
修改flink配置文件/conf/flink-conf.yaml
# jobmanager和taskmanager、其餘client的RPC通訊IP地址,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby jobmanager.rpc.address: localhost # jobmanager和taskmanager、其餘client的RPC通訊端口,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby jobmanager.rpc.port: 6123 # jobmanager JVM heap 內存大小 jobmanager.memory.process.size: 1024m # taskmanager JVM heap 內存大小 taskmanager.memory.process.size: 1024m # 每一個taskmanager提供的任務slots數量 taskmanager.numberOfTaskSlots: 3 # 並行計算個數 parallelism.default: 3 # 高可用模式 high-availability: zookeeper # JobManager元數據保留在文件系統storageDir中 指向此狀態的指針存儲在ZooKeeper中 high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/ # Zookeeper集羣 high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181 # 在zookeeper下的根目錄 high-availability.zookeeper.path.root: /flink_yarn # zookeeper節點下的集羣ID 該節點下放置了集羣所需的全部協調數據 多個flink集羣鏈接同一套zookeeper集羣須要配置各自不一樣的集羣ID,官方建議這個配置最好去掉,由於在 Yarn(以及Mesos)模式下,cluster-id 若是不配置的話,會配置成 Yarn 上的 Application ID ,從而能夠保證惟一性。 #high-availability.cluster-id: /default_yarn # 單個flink job重啓次數 必須小於等於yarn-site.xml中Application Master配置的嘗試次數 yarn.application-attempts: 10 #============================================================================== # Fault tolerance and checkpointing #============================================================================== # jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend) state.backend: rocksdb # 檢查點的默認目錄。Flink支持的文件系統中存儲檢查點的數據文件和元數據的默認目錄。必須從全部參與的進程/節點(即全部TaskManager和JobManager)訪問存儲路徑。 state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints # 保存點的默認目錄。由狀態後端用於將保存點寫入文件系統(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。 state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints # 是否應該建立增量檢查點。對於增量檢查點,只存儲前一個檢查點的差別,而不存儲完整的檢查點狀態。一些狀態後端可能不支持增量檢查點並忽略此選項。 state.backend.incremental: false # jobmanager故障恢復策略,指定做業計算如何從任務失敗中恢復 full從新啓動全部任務以恢復做業 region從新啓動可能受任務失敗影響的全部任務,region在目前(flink1.11)只對批處理有效,實時計算任然時full jobmanager.execution.failover-strategy: region # 全局檢查點的保留數量 state.checkpoints.num-retained: 3 # 本地恢復。默認狀況下,本地恢復處於禁用狀態。本地恢復當前僅涵蓋keyed state backends。當前,MemoryStateBackend不支持本地恢復。 state.backend.local-recovery: true # 存儲基於文件的狀態以進行本地恢復的根目錄。本地恢復當前僅涵蓋keyed state backends taskmanager.state.local.root-dirs: /opt/flink-tm-state
修改完全部配置後,注意將配置分發到其餘機器上。
5.5.2 運行/中止
這介紹Session的兩種運行模式,根據本身的實際業務場景使用:
- 客戶端模式
對於客戶端模式而言,能夠啓動多個yarn session,一個yarn session模式對應一個JobManager,並按照需求提交做業,同一個Session中能夠提交多個Flink做業。
- 分離模式
只能啓動一個yarn-sission,若是啓動多個,後面的session會一直處於等待,同一個yarn-session中能夠提交多個Flink做業,經過-d參數指定,表示即客戶端在啓動Flink Yarn Session後,就再也不屬於Yarn Cluster的一部分。
$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024 -d
啓動yarn-session前能夠先了解一下參數的含義,查看參數說明
$ bin/yarn-session.sh -h
啓動yarn-session
$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024
啓動yarn-session後,檢查進程:
[zihao@zookeeper110 ~]$ xsh jps -----------zookeeper110----------- 7252 QuorumPeerMain 7958 Jps -----------zookeeper111----------- 7714 Jps 7130 QuorumPeerMain -----------zookeeper112----------- 7747 Jps 7116 QuorumPeerMain -----------hadoop100----------- 8162 DataNode 8549 DFSZKFailoverController 8758 NodeManager 8375 JournalNode 14167 FlinkYarnSessionCli 14375 Jps 8031 NameNode -----------hadoop101----------- 8002 DataNode 8163 JournalNode 8259 DFSZKFailoverController 7910 NameNode 8838 NodeManager 18536 YarnSessionClusterEntrypoint 18633 Jps 8701 ResourceManager -----------hadoop102----------- 7666 ResourceManager 7523 JournalNode 10405 Jps 7769 NodeManager 7434 DataNode
YarnSessionClusterEntrypoint能夠理解爲Flink在Yarn上啓動的ApplicationMaster,其內部就運行着三各組件:Dispatcher、ResourceManager和 JobManager。它們是在同一個jvm進程中。
中止Yarn-session
hadoop路徑查看yarn上運行的任務
$ bin/yarn application –list
中止對應Application-Id對應的任務
$ bin/yarn application -kill application_1598166743428_0001
向Yarn上提交做業
Yarn-session啓動後,界面上會提示一個地址(主機和端口),這個地址就是JobManager(也是ApplicationMaster)的地址。
在提交job的時候,能夠在yarn-session啓動的機器上直接提交,或者在其餘機器上經過-m指定jobmanager的地址進行提交:
$ bin/flink run -m hadoop101:39998 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-example
注:筆者這裏運行的是wordcount示例,將以前HDFS上已有的數據進行了wordcount分析,並將分析結果輸出到HDFS上。
5.5.3 查看運行狀況
啓動yarn-session後:
提交做業,做業開始運行:
做業運行完成:
在HDFS中查看運行結果:
第6章 高可用演示
在Flink on Yarn模式中,Session方式的高可用是針對JobManager(ApplicationMaster)的高可用,而session模式中啓動yarn-session後,該進程常駐在Yarn中,其高可用依賴於Yarn自身的資源管理機制,在JobManager(ApplicationMaster)掛掉以後,Yarn會在集羣中重啓該進程。
Per-Job方式,內部複用的是standalone JobManager 進程的HA,本章就再也不介紹了,詳見筆者上一篇文章:大數據實操篇 No.10-Flink Standalone集羣HA高可用部署
6.1 啓動yarn-session
先按前幾章介紹的,把Zookeeper、HDFS、Yarn、Flink所有啓動起來。並啓動yarn-session。
此時,咱們查看如下進程,發現YarnSessionClusterEntrypoint這個進程啓動再hadoop100這臺機器上:
[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps -----------zookeeper110----------- 7142 QuorumPeerMain 7192 Jps -----------zookeeper111----------- 7156 Jps 7103 QuorumPeerMain -----------zookeeper112----------- 7133 Jps 7087 QuorumPeerMain -----------hadoop100----------- 7872 DFSZKFailoverController 7652 JournalNode 8260 FlinkYarnSessionCli 7285 NameNode 8006 NodeManager 8630 YarnSessionClusterEntrypoint 8712 Jps 7402 DataNode -----------hadoop101----------- 7217 NameNode 8017 NodeManager 8356 Jps 7579 JournalNode 7867 ResourceManager 7293 DataNode 7869 DFSZKFailoverController -----------hadoop102----------- 7795 Jps 7319 ResourceManager 7209 JournalNode 7099 DataNode 7439 NodeManager
6.2 查看Flink Web UI
經過Yarn集羣管理界面打開Flink Web UI:
6.3 Kill掉ApplicationMaster
Kill掉YarnSessionClusterEntrypoint(ApplicationMaster)進程,此時Flink Web UI界面會短暫的不可用,Yarn正在嘗試從新啓動ApplicationMaster。重試次數由Yarn配置中的yarn.resourcemanager.am.max-attempts和Flink配置中的yarn.application-attempts決定。
稍等片刻後咱們再從新再yarn集羣管理界面打開Flink Web UI:
在JobManager的log中咱們也能發現此次leader選舉的日誌信息:
6.4 查看恢復後的進程
再從新查看進程,發現YarnSessionClusterEntrypoint已經從新在hadoop102上啓動了:
[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps -----------zookeeper110----------- 7237 Jps 7142 QuorumPeerMain -----------zookeeper111----------- 7188 Jps 7103 QuorumPeerMain -----------zookeeper112----------- 7165 Jps 7087 QuorumPeerMain -----------hadoop100----------- 7872 DFSZKFailoverController 7652 JournalNode 8260 FlinkYarnSessionCli 7285 NameNode 8006 NodeManager 7402 DataNode 8875 Jps -----------hadoop101----------- 7217 NameNode 8017 NodeManager 7579 JournalNode 7867 ResourceManager 7293 DataNode 7869 DFSZKFailoverController 8447 Jps -----------hadoop102----------- 8084 YarnSessionClusterEntrypoint 7319 ResourceManager 8151 Jps 7209 JournalNode 7099 DataNode 7439 NodeManager
這裏就演示完了Flink on Yarn Session模式下,JobManager(ApplicaionMaster)的高可用。
至此,Flink on Yarn集羣高可用模式的部署及使用就完成了。 後續的章節咱們將開始進入實時計算整條鏈路的實戰環節!