大數據實操篇 No.11-Flink on Yarn集羣HA高可用部署及使用

第1章 簡介

1.1 概要介紹

Flink on Yarn的HA高可用模式,首先依賴於Yarn自身的高可用機制(ResourceManager高可用),並經過Yarn對JobManager進行管理,當JobManager失效時,Yarn將從新啓動JobManager。其次Flink Job在恢復時,須要依賴Checkpoint進行恢復,而Checkpoint的快照依賴於遠端的存儲:HDFS,因此HDFS也必須是高可用,同時JobManager的元數據信息也依賴於HDFS的高可用(namenode的高可用,和多副本機制),再者JobManager元數據的指針信息要依賴於Zookeeper的高可用。本章重點介紹Flink自己的高可用,其餘框架的高可用請參考筆者以前的文章。node

1.2 Flink on Yarn的優點

相對於 Standalone 模式,在Yarn 模式下有如下幾點好處apache

1.資源按需使用,提升集羣的資源利用率;後端

2.任務有優先級,根據優先級運行做業;bash

3.基於 Yarn 調度系統,可以自動化地處理各個角色的 Failover:session

      JobManager 進程和 TaskManager 進程都由 Yarn NodeManager 監控;多線程

      若是 JobManager 進程異常退出,則 Yarn ResourceManager 會從新調度 JobManager 到其餘機器;app

      若是 TaskManager 進程異常退出,JobManager 會收到消息並從新向 Yarn ResourceManager 申請資源,從新啓動 TaskManager。框架

 

第2章 Flink on Yarn模式運行的方式

2.1 Per-Job

Per-Job模式:簡答的說就是直接run job,每次提交的任務Yarn都會分配一個JobManager,執行完以後整個資源會釋放,包括JobManager和TaskManager。jvm

Per-Job模式適合比較大的任務、執行時間比較長的任務。oop

2.2 Session

Session模式:在Session模式中, Dispatcher 和 ResourceManager 是能夠複用的;當執行完Job以後JobManager並不會釋放,Session 模式也稱爲多線程模式,其特色是資源會一直存在不會釋放。使用時先啓動yarn-session,而後再提交job,每次提交job,也都會分配一個JobManager。
Session模式適合比較小的任務、執行時間比較短的任務。該模式不用頻繁的申請資源和釋放資源。

注:本章先簡單的瞭解這兩種模式和使用上的區別,筆者會在後續的章節中對其原理進行較爲詳細的剖析。敬請期待!

 

第3章 集羣規劃

筆者是在原有的3臺機器名爲Hadoop10*的機器上安裝的Flink,因此這裏機器名都是Hadoop100、Hadoop10一、Hadoop102。 

 

Hadoop100(Flink)

Hadoop101(Flink)

Hadoop102(Flink)

JobManager

TaskManager

 

第4章 下載安裝

Flink官網:https://flink.apache.org/

你們到官網進行下載,先參考我以前的文章進行環境變量的配置:大數據實操篇 No.9-Flink Standalone模式部署及使用

Flink on Yarn部署使用前必定要先安裝好Zookeeper和Hadoop(HDFS和Yarn)。

 

第5章 部署和使用

5.1 修改Hadoop配置

5.1.1 修改yarn-site.xml

修改hadoop配置文件/etc/hadoop/yarn-site.xml,設置application master重啓時,嘗試的最大次數。

<property>
    <name>yarn.resourcemanager.am.max-attempts</name>
    <value>10</value>
    <description>
    The maximum number of application master execution attempts.
    </description>
</property>

5.2 修改masters

 修改conf目錄下masters文件:

hadoop100:8081
hadoop101:8081
hadoop102:8081

5.3 修改workers

 修改conf目錄下workers文件:

hadoop100
hadoop101
hadoop102

接下來,筆者用2種模式單獨進行部署演示。

5.4 Per-Job模式

5.4.1 修改flink-conf.yaml

修改flink配置文件/conf/flink-conf.yaml

# jobmanager和taskmanager、其餘client的RPC通訊IP地址,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby
jobmanager.rpc.address: localhost

# jobmanager和taskmanager、其餘client的RPC通訊端口,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby
jobmanager.rpc.port: 6123

# jobmanager JVM heap 內存大小
jobmanager.memory.process.size: 1024m

# taskmanager JVM heap 內存大小
taskmanager.memory.process.size: 1024m

# 每一個taskmanager提供的任務slots數量
# 並行度等於TM 數量乘以每一個TM 的Solts 數量 TM=並行度/Solts數量 若是slots數量大於8 則只會起一個TM
taskmanager.numberOfTaskSlots: 3

# 並行計算個數
parallelism.default: 6

# 高可用模式
high-availability: zookeeper

# JobManager元數據保留在文件系統storageDir中 指向此狀態的指針存儲在ZooKeeper中
high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/

# Zookeeper集羣
high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181

# 在zookeeper下的根目錄
high-availability.zookeeper.path.root: /flink_yarn

# zookeeper節點下的集羣ID 該節點下放置了集羣所需的全部協調數據 多個flink集羣鏈接同一套zookeeper集羣須要配置各自不一樣的集羣ID,官方建議這個配置最好去掉,由於在 Yarn(以及Mesos)模式下,cluster-id 若是不配置的話,會配置成 Yarn 上的 Application ID ,從而能夠保證惟一性。
#high-availability.cluster-id: /default_yarn

# 單個flink job重啓次數 必須小於等於yarn-site.xml中Application Master配置的嘗試次數
yarn.application-attempts: 10

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
state.backend: rocksdb
# 檢查點的默認目錄。Flink支持的文件系統中存儲檢查點的數據文件和元數據的默認目錄。必須從全部參與的進程/節點(即全部TaskManager和JobManager)訪問存儲路徑。
state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
# 保存點的默認目錄。由狀態後端用於將保存點寫入文件系統(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
# 是否應該建立增量檢查點。對於增量檢查點,只存儲前一個檢查點的差別,而不存儲完整的檢查點狀態。一些狀態後端可能不支持增量檢查點並忽略此選項。
state.backend.incremental: false
# jobmanager故障恢復策略,指定做業計算如何從任務失敗中恢復 full從新啓動全部任務以恢復做業 region從新啓動可能受任務失敗影響的全部任務,region在目前(flink1.11)只對批處理有效,實時計算任然時full
jobmanager.execution.failover-strategy: region
# 全局檢查點的保留數量
state.checkpoints.num-retained: 3
# 本地恢復。默認狀況下,本地恢復處於禁用狀態。本地恢復當前僅涵蓋keyed state backends。當前,MemoryStateBackend不支持本地恢復。
state.backend.local-recovery: true
# 存儲基於文件的狀態以進行本地恢復的根目錄。本地恢復當前僅涵蓋keyed state backends
taskmanager.state.local.root-dirs: /opt/flink-tm-state

修改完全部配置後,注意將配置分發到其餘機器上。

注意:Hadoop中的yarn.resourcemanager.am.max-attemps和Flink中的yarn.application-attempts配置含義:

flink job失敗重啓次數,嘗試從新啓動9次(9次重試+ 1次初始嘗試)
flink job(在yarn中稱爲application)在Jobmanager(或者叫Application Master)恢復時,容許重啓的最大次數。
注意,Flink On Yarn環境中,當Jobmanager(Application Master)失敗時,yarn會嘗試重啓JobManager(AM),重啓後,會從新啓動Flink的Job(application)。所以,flink中的yarn.application-attempts配置不能超過yarn中的yarn.resourcemanager.am.max-attemps。

5.4.2 運行做業

Flink各版本的運行命令可能存在差別,能夠先經過-h查看幫助。

$ bin/flink run -h

運行Job

$ bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-job-example

 5.4.3 查看運行狀況

資源已經受Yarn的管理,能夠直接從Yarn集羣的Web UI界面打開ApplicationMaster,從而進入到Flink的Web UI控制檯:

運行中的做業: 

運行完成的做業: 

運行完成後,JobManager和TaskManager就被釋放了,此時Yarn中顯示的運行結果以下: 

Linux系統中顯示的運行信息: 

5.5 Session模式

5.5.1 修改flink-conf.yaml

修改flink配置文件/conf/flink-conf.yaml

# jobmanager和taskmanager、其餘client的RPC通訊IP地址,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby
jobmanager.rpc.address: localhost

# jobmanager和taskmanager、其餘client的RPC通訊端口,TaskManager用於鏈接到JobManager/ResourceManager 。HA模式不用配置此項,在master文件中配置,由zookeeper選出leader與standby
jobmanager.rpc.port: 6123

# jobmanager JVM heap 內存大小
jobmanager.memory.process.size: 1024m

# taskmanager JVM heap 內存大小
taskmanager.memory.process.size: 1024m

# 每一個taskmanager提供的任務slots數量
taskmanager.numberOfTaskSlots: 3

# 並行計算個數
parallelism.default: 3

# 高可用模式
high-availability: zookeeper

# JobManager元數據保留在文件系統storageDir中 指向此狀態的指針存儲在ZooKeeper中
high-availability.storageDir: hdfs://hadoop100:9000/flink/ha/

# Zookeeper集羣
high-availability.zookeeper.quorum: zookeeper110:2181,zookeeper111:2181,zookeeper112:2181

# 在zookeeper下的根目錄
high-availability.zookeeper.path.root: /flink_yarn

# zookeeper節點下的集羣ID 該節點下放置了集羣所需的全部協調數據 多個flink集羣鏈接同一套zookeeper集羣須要配置各自不一樣的集羣ID,官方建議這個配置最好去掉,由於在 Yarn(以及Mesos)模式下,cluster-id 若是不配置的話,會配置成 Yarn 上的 Application ID ,從而能夠保證惟一性。
#high-availability.cluster-id: /default_yarn

# 單個flink job重啓次數 必須小於等於yarn-site.xml中Application Master配置的嘗試次數
yarn.application-attempts: 10

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend)
state.backend: rocksdb
# 檢查點的默認目錄。Flink支持的文件系統中存儲檢查點的數據文件和元數據的默認目錄。必須從全部參與的進程/節點(即全部TaskManager和JobManager)訪問存儲路徑。
state.checkpoints.dir: hdfs://hadoop100:9000/flink/checkpoints
# 保存點的默認目錄。由狀態後端用於將保存點寫入文件系統(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)。
state.savepoints.dir: hdfs://hadoop100:9000/flink/savepoints
# 是否應該建立增量檢查點。對於增量檢查點,只存儲前一個檢查點的差別,而不存儲完整的檢查點狀態。一些狀態後端可能不支持增量檢查點並忽略此選項。
state.backend.incremental: false
# jobmanager故障恢復策略,指定做業計算如何從任務失敗中恢復 full從新啓動全部任務以恢復做業 region從新啓動可能受任務失敗影響的全部任務,region在目前(flink1.11)只對批處理有效,實時計算任然時full
jobmanager.execution.failover-strategy: region
# 全局檢查點的保留數量
state.checkpoints.num-retained: 3
# 本地恢復。默認狀況下,本地恢復處於禁用狀態。本地恢復當前僅涵蓋keyed state backends。當前,MemoryStateBackend不支持本地恢復。
state.backend.local-recovery: true
# 存儲基於文件的狀態以進行本地恢復的根目錄。本地恢復當前僅涵蓋keyed state backends
taskmanager.state.local.root-dirs: /opt/flink-tm-state

修改完全部配置後,注意將配置分發到其餘機器上。 

5.5.2 運行/中止

這介紹Session的兩種運行模式,根據本身的實際業務場景使用:

  • 客戶端模式

對於客戶端模式而言,能夠啓動多個yarn session,一個yarn session模式對應一個JobManager,並按照需求提交做業,同一個Session中能夠提交多個Flink做業。

  • 分離模式

只能啓動一個yarn-sission,若是啓動多個,後面的session會一直處於等待,同一個yarn-session中能夠提交多個Flink做業,經過-d參數指定,表示即客戶端在啓動Flink Yarn Session後,就再也不屬於Yarn Cluster的一部分。

$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024 -d

啓動yarn-session前能夠先了解一下參數的含義,查看參數說明

$ bin/yarn-session.sh -h

啓動yarn-session

$ bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024

啓動yarn-session後,檢查進程:

[zihao@zookeeper110 ~]$ xsh jps
-----------zookeeper110-----------
7252 QuorumPeerMain
7958 Jps
-----------zookeeper111-----------
7714 Jps
7130 QuorumPeerMain
-----------zookeeper112-----------
7747 Jps
7116 QuorumPeerMain
-----------hadoop100-----------
8162 DataNode
8549 DFSZKFailoverController
8758 NodeManager
8375 JournalNode
14167 FlinkYarnSessionCli
14375 Jps
8031 NameNode
-----------hadoop101-----------
8002 DataNode
8163 JournalNode
8259 DFSZKFailoverController
7910 NameNode
8838 NodeManager
18536 YarnSessionClusterEntrypoint
18633 Jps
8701 ResourceManager
-----------hadoop102-----------
7666 ResourceManager
7523 JournalNode
10405 Jps
7769 NodeManager
7434 DataNode

YarnSessionClusterEntrypoint能夠理解爲Flink在Yarn上啓動的ApplicationMaster,其內部就運行着三各組件:Dispatcher、ResourceManager和 JobManager。它們是在同一個jvm進程中。

中止Yarn-session
hadoop路徑查看yarn上運行的任務

$ bin/yarn application –list

中止對應Application-Id對應的任務

$ bin/yarn application -kill application_1598166743428_0001

向Yarn上提交做業

Yarn-session啓動後,界面上會提示一個地址(主機和端口),這個地址就是JobManager(也是ApplicationMaster)的地址。

 在提交job的時候,能夠在yarn-session啓動的機器上直接提交,或者在其餘機器上經過-m指定jobmanager的地址進行提交:

$ bin/flink run -m hadoop101:39998 ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/user/hive/warehouse/covid19count.db/t_infectcount/partdate=2020071301 -output hdfs://hadoop100:9000/flink/wordcount-example

注:筆者這裏運行的是wordcount示例,將以前HDFS上已有的數據進行了wordcount分析,並將分析結果輸出到HDFS上。

5.5.3 查看運行狀況

 啓動yarn-session後:

 提交做業,做業開始運行:

 做業運行完成:

 

 在HDFS中查看運行結果:

 

第6章 高可用演示

在Flink on Yarn模式中,Session方式的高可用是針對JobManager(ApplicationMaster)的高可用,而session模式中啓動yarn-session後,該進程常駐在Yarn中,其高可用依賴於Yarn自身的資源管理機制,在JobManager(ApplicationMaster)掛掉以後,Yarn會在集羣中重啓該進程。

Per-Job方式,內部複用的是standalone JobManager 進程的HA,本章就再也不介紹了,詳見筆者上一篇文章:大數據實操篇 No.10-Flink Standalone集羣HA高可用部署

6.1 啓動yarn-session

先按前幾章介紹的,把Zookeeper、HDFS、Yarn、Flink所有啓動起來。並啓動yarn-session。

此時,咱們查看如下進程,發現YarnSessionClusterEntrypoint這個進程啓動再hadoop100這臺機器上:

[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
-----------zookeeper110-----------
7142 QuorumPeerMain
7192 Jps
-----------zookeeper111-----------
7156 Jps
7103 QuorumPeerMain
-----------zookeeper112-----------
7133 Jps
7087 QuorumPeerMain
-----------hadoop100-----------
7872 DFSZKFailoverController
7652 JournalNode
8260 FlinkYarnSessionCli
7285 NameNode
8006 NodeManager
8630 YarnSessionClusterEntrypoint
8712 Jps
7402 DataNode
-----------hadoop101-----------
7217 NameNode
8017 NodeManager
8356 Jps
7579 JournalNode
7867 ResourceManager
7293 DataNode
7869 DFSZKFailoverController
-----------hadoop102-----------
7795 Jps
7319 ResourceManager
7209 JournalNode
7099 DataNode
7439 NodeManager

6.2 查看Flink Web UI

經過Yarn集羣管理界面打開Flink Web UI:

 

6.3 Kill掉ApplicationMaster 

Kill掉YarnSessionClusterEntrypoint(ApplicationMaster)進程,此時Flink Web UI界面會短暫的不可用,Yarn正在嘗試從新啓動ApplicationMaster。重試次數由Yarn配置中的yarn.resourcemanager.am.max-attempts和Flink配置中的yarn.application-attempts決定。

稍等片刻後咱們再從新再yarn集羣管理界面打開Flink Web UI:

 

在JobManager的log中咱們也能發現此次leader選舉的日誌信息:

6.4 查看恢復後的進程

再從新查看進程,發現YarnSessionClusterEntrypoint已經從新在hadoop102上啓動了:

[zihao@zookeeper110 apache-zookeeper-3.5.7-bin]$ xsh jps
-----------zookeeper110-----------
7237 Jps
7142 QuorumPeerMain
-----------zookeeper111-----------
7188 Jps
7103 QuorumPeerMain
-----------zookeeper112-----------
7165 Jps
7087 QuorumPeerMain
-----------hadoop100-----------
7872 DFSZKFailoverController
7652 JournalNode
8260 FlinkYarnSessionCli
7285 NameNode
8006 NodeManager
7402 DataNode
8875 Jps
-----------hadoop101-----------
7217 NameNode
8017 NodeManager
7579 JournalNode
7867 ResourceManager
7293 DataNode
7869 DFSZKFailoverController
8447 Jps
-----------hadoop102-----------
8084 YarnSessionClusterEntrypoint
7319 ResourceManager
8151 Jps
7209 JournalNode
7099 DataNode
7439 NodeManager

這裏就演示完了Flink on Yarn Session模式下,JobManager(ApplicaionMaster)的高可用。

 

至此,Flink on Yarn集羣高可用模式的部署及使用就完成了。 後續的章節咱們將開始進入實時計算整條鏈路的實戰環節!

相關文章
相關標籤/搜索