Flink部署及做業提交(On YARN)

Hadoop環境快速搭建

官方文檔:html

在上一篇 Flink部署及做業提交(On Flink Cluster) 文章中,咱們介紹瞭如何編譯部署Flink自身的資源分配和管理系統,並將做業提交到該系統上去運行。但一般來說這種方式用得很少,由於在企業中,可能會使用不一樣的分佈式計算框架,如Spark、Storm或MapReduce等。java

若是每一種框架都須要搭建各自的資源分配和管理系統,就沒法共享資源,致使資源利用率低。而且大多企業通常會使用Hadoop生態的相關組件作做爲大數據處理平臺的底座,如HDFS、Hive、YARN等。node

其中 YARN 是資源調度框架、通用的資源管理系統,能夠爲上層應用提供統一的資源管理和調度,Spark、Flink、Storm等計算框架均可以集成到 YARN 上。如此一來這些計算框架能夠享受總體的資源調度,進而提升集羣資源的利用率,這也就是所謂的 xxx on YARN。所以,絕大部分企業都是將計算做業放到 YARN 上進行調度,而不是每種計算框架都單獨搭一個資源分配和管理系統。這也是爲何要單獨介紹Flink On YARN的緣由。web

想要讓Flink做業跑在 YARN 上,咱們首先得搭建一個Hadoop環境,爲了簡單這裏只搭建單節點環境。我這裏使用的是CDH的Hadoop發行版。下載地址以下:shell

首先須要安裝好Java運行環境,因爲比較簡單這裏就不演示了:apache

[root@hadoop01 ~]# echo ${JAVA_HOME}
/usr/local/jdk/11
[root@hadoop01 ~]# java -version
java version "11.0.8" 2020-07-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
[root@hadoop01 ~]#

配置hosts,將主機名與本地ip創建一個映射關係:編程

[root@hadoop01 ~]# vim /etc/hosts
192.168.243.142   hadoop01

關閉防火牆:vim

[root@hadoop01 ~]# systemctl stop firewalld && systemctl disable firewalld

配置免密登陸:api

[root@hadoop01 ~]# ssh-keygen -t rsa      # 生成密鑰對
[root@hadoop01 ~]# ssh-copy-id hadoop01    # 拷貝公鑰並追加到本身的受權列表文件中

而後就能夠開始安裝Hadoop了,這裏採用 hadoop-2.6.0-cdh5.16.2 版本做爲演示,複製下載連接到系統上進行下載:瀏覽器

[root@hadoop01 ~]# cd /usr/local/src
[root@hadoop01 /usr/local/src]# wget http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.16.2.tar.gz

解壓下載好的壓縮包:

[root@hadoop01 /usr/local/src]# tar -zxvf hadoop-2.6.0-cdh5.16.2.tar.gz -C /usr/local

配置系統環境變量:

[root@hadoop01 ~]# vim /etc/profile
export HADOOP_HOME=/usr/local/hadoop-2.6.0-cdh5.16.2
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
[root@hadoop01 ~]# source /etc/profile

修改幾個配置文件:

[root@hadoop01 ~]# cd $HADOOP_HOME
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/local/jdk/11  # 配置JDK的目錄

# 配置 core
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/core-site.xml
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://hadoop01:8020</value>
  </property>
</configuration>

# 配置 hdfs,設置副本因子和臨時目錄
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hdfs-site.xml
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/data/hadoop/tmp</value>
  </property>
</configuration>

# 配置slave節點的ip或hostname
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/slaves
hadoop01

# 配置 yarn
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/yarn-site.xml
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

# 配置MapReduce
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/mapred-site.xml
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
</configuration>

# 建立hadoop的臨時目錄
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# mkdir -p /data/hadoop/tmp

應用HDFS的配置:

[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./bin/hdfs namenode -format

啓動全部組件:

[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./sbin/start-all.sh

啓動成功後查看進程:

[root@hadoop01 ~]# jps
3344 SecondaryNameNode
2722 NameNode
3812 Jps
3176 DataNode
3578 NodeManager
3502 ResourceManager
[root@hadoop01 ~]#

而後在瀏覽器中訪問HDFS的web界面,默認端口是50070:
Flink部署及做業提交(On YARN)

接着訪問HDFS的YARN界面,默認端口是8088:
Flink部署及做業提交(On YARN)

測試HDFS可否正常讀寫:

[root@hadoop01 ~]# hadoop fs -put anaconda-ks.cfg /  # 任意put一個文件到hdfs
[root@hadoop01 ~]# hadoop fs -ls /    # 查看hdfs中是否有該文件
Found 1 items  
-rw-r--r--   1 root supergroup       1269 2020-09-29 17:45 /anaconda-ks.cfg

通過測試,確認Hadoop環境是運行正常以後,咱們就能夠嘗試將Flink應用放到YARN上運行了。


Flink on YARN兩種方式

Flink on YARN 有兩種模式:Session模式和Per-Job模式。在Session模式中多個 JobManager 共享 Dispatcher 和 YarnResourceManager。在這種模式下,須要先向 YARN 申請資源,初始化一個常駐服務在 YARN 上,後續提交的Job都將運行在這個Session上:
Flink部署及做業提交(On YARN)

而Per-Job模式則相反,一個 JobManager 獨享 Dispatcher 和 YarnResourceManager。也就是說每提交一個Job都新建一個Session,不一樣Job之間的資源是隔離的,不會互相影響:
Flink部署及做業提交(On YARN)

想要深刻了解的話能夠參考官方文檔:


Flink on YARN Session模式實操

首先將在 Flink部署及做業提交(On Flink Cluster) 一文中編譯好的Flink目錄拷貝到當前部署了Hadoop環境的機器上:

[root@hadoop01 ~]# scp -r 192.168.243.148:/usr/local/src/flink-release-1.11.2/flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink

配置環境變量,不然Flink會報找不到Hadoop相關Class的異常:

[root@hadoop01 ~]# vim /etc/profile
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mepreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*

而後執行./bin/yarn-session.sh --help命令測試一下可否正常輸出幫助信息:

[root@hadoop01 ~]# cd /usr/local/flink/
[root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh --help
...
Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on YARN
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
[root@hadoop01 /usr/local/flink]#
  • 若是沒配環境變量的話,執行這條命令就會報找不到類的錯誤

確認Flink能夠正常找到Hadoop後,使用以下命令在 YARN 上建立一個常駐服務:

[root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh -jm 1024m -tm 2048m
...

JobManager Web Interface: http://hadoop01:37525    # 建立成功的話會輸出JobManager的web訪問地址
  • -jm:指定JobManager須要的內存資源
  • -tm:指定TaskManager須要的內存資源

使用瀏覽器打開 YARN 的web界面,正常狀況下會有以下應用:
Flink部署及做業提交(On YARN)

點擊應用右邊的 「ApplicationMaster」 能夠跳轉到Flink的dashboard。此時能夠看到Flink Dashboard頁面上任何數字都是0,應該就能看得出實際這只是啓動了一個JobManager:
Flink部署及做業提交(On YARN)

  • Tips:要想頁面可以正常跳轉,還得在瀏覽器所在主機的hosts文件中配置一下hadoop01這個主機名到IP的映射關係

接下來咱們嘗試一下提交做業到 YARN 上運行,首先準備好官方提供的測試文件,並put到HDFS中:

[root@hadoop01 ~]# wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
[root@hadoop01 ~]# hadoop fs -copyFromLocal LICENSE-2.0.txt /

而後執行以下命令,提交一個Word Count做業:

[root@hadoop01 ~]# cd /usr/local/flink/
[root@hadoop01 /usr/local/flink]# ./bin/flink run -m hadoop01:37525 ./examples/batch/WordCount.jar \
       --input hdfs://hadoop01:8020/LICENSE-2.0.txt --output hdfs://hadoop01:8020/wordcount-result.txt
  • Tips:這裏的hadoop01:37525,是執行完yarn-session.sh命令輸出的JobManager的訪問地址

執行完成後,控制檯會輸出以下內容:

Job has been submitted with JobID 2240e11994cf8579a78e16a1984f08db
Program execution finished
Job with JobID 2240e11994cf8579a78e16a1984f08db has finished.
Job Runtime: 10376 ms

此時到「Completed Jobs」頁面中,能夠看到運行完成的做業及其信息:
Flink部署及做業提交(On YARN)

除此以外,咱們還能夠查看該做業輸出到HDFS中的結果文件:

[root@hadoop01 /usr/local/flink]# hadoop fs -ls /wordcount-result.txt
-rw-r--r--   1 root supergroup       4499 2020-09-29 20:25 /wordcount-result.txt
[root@hadoop01 /usr/local/flink]# hadoop fs -text /wordcount-result.txt

Flink on YARN Per-Job模式實操

首先將以前在 yarn 上運行的應用和相關進程給kill掉:

[root@hadoop01 ~]# yarn application -kill application_1601372571363_0001
[root@hadoop01 ~]# jps
6995 SecondaryNameNode
7204 ResourceManager
7305 NodeManager
11291 Jps
6734 NameNode
6830 DataNode
8942 FlinkYarnSessionCli
[root@hadoop01 ~]# kill 8942

Per-Job模式更簡單,由於是提交一個做業就建立一次資源的,因此直接運行以下命令就能夠提交一個Flink的Word Count做業到 yarn 上,不須要像Session模式那樣事先去建立資源:

[root@hadoop01 /usr/local/flink]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

做業運行完成後,控制檯會輸出一堆統計結果。此時在 yarn 上能夠看到該做業已經執行完成:
Flink部署及做業提交(On YARN)


Flink Scala Shell的簡單使用

在以前的演示中能夠看到,提交的Flink做業都是以jar包形式存在的。若是咱們在實際開發中,須要頻繁修改代碼提交到 yarn 上測試,那麼就得頻繁的打包,相對來講就有點麻煩。那麼Flink有沒有像Spark那樣提供相似於 Spark Shell 的交互式編程終端用於簡單的代碼測試呢?答案是有的,Flink提供了PyFlink Shell和Scala Shell,能夠執行Python和Scala代碼。

這裏簡單演示下Flink Scala Shell的使用,執行以下命令打開Flink Scala Shell:

[root@hadoop01 /usr/local/flink]# ./bin/start-scala-shell.sh local
  • 這裏的local表示在本地運行,除此以外還能夠選擇remoteyarn,具體可使用--help參數進行查看

shell裏調用API的方式仍是同樣的,只是環境變成了內置的變量,例如這裏使用的benv就表示批處理的env:

scala> val dataSet = benv.readTextFile("file:///root/LICENSE-2.0.txt")
dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@3110bb19

scala> dataSet.print
相關文章
相關標籤/搜索