官方文檔:html
在上一篇 Flink部署及做業提交(On Flink Cluster) 文章中,咱們介紹瞭如何編譯部署Flink自身的資源分配和管理系統,並將做業提交到該系統上去運行。但一般來說這種方式用得很少,由於在企業中,可能會使用不一樣的分佈式計算框架,如Spark、Storm或MapReduce等。java
若是每一種框架都須要搭建各自的資源分配和管理系統,就沒法共享資源,致使資源利用率低。而且大多企業通常會使用Hadoop生態的相關組件作做爲大數據處理平臺的底座,如HDFS、Hive、YARN等。node
其中 YARN 是資源調度框架、通用的資源管理系統,能夠爲上層應用提供統一的資源管理和調度,Spark、Flink、Storm等計算框架均可以集成到 YARN 上。如此一來這些計算框架能夠享受總體的資源調度,進而提升集羣資源的利用率,這也就是所謂的 xxx on YARN。所以,絕大部分企業都是將計算做業放到 YARN 上進行調度,而不是每種計算框架都單獨搭一個資源分配和管理系統。這也是爲何要單獨介紹Flink On YARN的緣由。web
想要讓Flink做業跑在 YARN 上,咱們首先得搭建一個Hadoop環境,爲了簡單這裏只搭建單節點環境。我這裏使用的是CDH的Hadoop發行版。下載地址以下:shell
首先須要安裝好Java運行環境,因爲比較簡單這裏就不演示了:apache
[root@hadoop01 ~]# echo ${JAVA_HOME} /usr/local/jdk/11 [root@hadoop01 ~]# java -version java version "11.0.8" 2020-07-14 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) [root@hadoop01 ~]#
配置hosts
,將主機名與本地ip創建一個映射關係:編程
[root@hadoop01 ~]# vim /etc/hosts 192.168.243.142 hadoop01
關閉防火牆:vim
[root@hadoop01 ~]# systemctl stop firewalld && systemctl disable firewalld
配置免密登陸:api
[root@hadoop01 ~]# ssh-keygen -t rsa # 生成密鑰對 [root@hadoop01 ~]# ssh-copy-id hadoop01 # 拷貝公鑰並追加到本身的受權列表文件中
而後就能夠開始安裝Hadoop了,這裏採用 hadoop-2.6.0-cdh5.16.2 版本做爲演示,複製下載連接到系統上進行下載:瀏覽器
[root@hadoop01 ~]# cd /usr/local/src [root@hadoop01 /usr/local/src]# wget http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.16.2.tar.gz
解壓下載好的壓縮包:
[root@hadoop01 /usr/local/src]# tar -zxvf hadoop-2.6.0-cdh5.16.2.tar.gz -C /usr/local
配置系統環境變量:
[root@hadoop01 ~]# vim /etc/profile export HADOOP_HOME=/usr/local/hadoop-2.6.0-cdh5.16.2 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin [root@hadoop01 ~]# source /etc/profile
修改幾個配置文件:
[root@hadoop01 ~]# cd $HADOOP_HOME [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hadoop-env.sh export JAVA_HOME=/usr/local/jdk/11 # 配置JDK的目錄 # 配置 core [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/core-site.xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://hadoop01:8020</value> </property> </configuration> # 配置 hdfs,設置副本因子和臨時目錄 [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hdfs-site.xml <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/data/hadoop/tmp</value> </property> </configuration> # 配置slave節點的ip或hostname [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/slaves hadoop01 # 配置 yarn [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/yarn-site.xml <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration> # 配置MapReduce [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/mapred-site.xml <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration> # 建立hadoop的臨時目錄 [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# mkdir -p /data/hadoop/tmp
應用HDFS的配置:
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./bin/hdfs namenode -format
啓動全部組件:
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./sbin/start-all.sh
啓動成功後查看進程:
[root@hadoop01 ~]# jps 3344 SecondaryNameNode 2722 NameNode 3812 Jps 3176 DataNode 3578 NodeManager 3502 ResourceManager [root@hadoop01 ~]#
而後在瀏覽器中訪問HDFS的web界面,默認端口是50070:
接着訪問HDFS的YARN界面,默認端口是8088:
測試HDFS可否正常讀寫:
[root@hadoop01 ~]# hadoop fs -put anaconda-ks.cfg / # 任意put一個文件到hdfs [root@hadoop01 ~]# hadoop fs -ls / # 查看hdfs中是否有該文件 Found 1 items -rw-r--r-- 1 root supergroup 1269 2020-09-29 17:45 /anaconda-ks.cfg
通過測試,確認Hadoop環境是運行正常以後,咱們就能夠嘗試將Flink應用放到YARN上運行了。
Flink on YARN 有兩種模式:Session模式和Per-Job模式。在Session模式中多個 JobManager 共享 Dispatcher 和 YarnResourceManager。在這種模式下,須要先向 YARN 申請資源,初始化一個常駐服務在 YARN 上,後續提交的Job都將運行在這個Session上:
而Per-Job模式則相反,一個 JobManager 獨享 Dispatcher 和 YarnResourceManager。也就是說每提交一個Job都新建一個Session,不一樣Job之間的資源是隔離的,不會互相影響:
想要深刻了解的話能夠參考官方文檔:
首先將在 Flink部署及做業提交(On Flink Cluster) 一文中編譯好的Flink目錄拷貝到當前部署了Hadoop環境的機器上:
[root@hadoop01 ~]# scp -r 192.168.243.148:/usr/local/src/flink-release-1.11.2/flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink
配置環境變量,不然Flink會報找不到Hadoop相關Class的異常:
[root@hadoop01 ~]# vim /etc/profile export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mepreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*
而後執行./bin/yarn-session.sh --help
命令測試一下可否正常輸出幫助信息:
[root@hadoop01 ~]# cd /usr/local/flink/ [root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh --help ... Usage: Optional -at,--applicationType <arg> Set a custom application type for the application on YARN -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode [root@hadoop01 /usr/local/flink]#
確認Flink能夠正常找到Hadoop後,使用以下命令在 YARN 上建立一個常駐服務:
[root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh -jm 1024m -tm 2048m ... JobManager Web Interface: http://hadoop01:37525 # 建立成功的話會輸出JobManager的web訪問地址
-jm
:指定JobManager須要的內存資源-tm
:指定TaskManager須要的內存資源使用瀏覽器打開 YARN 的web界面,正常狀況下會有以下應用:
點擊應用右邊的 「ApplicationMaster」 能夠跳轉到Flink的dashboard。此時能夠看到Flink Dashboard頁面上任何數字都是0,應該就能看得出實際這只是啓動了一個JobManager:
hosts
文件中配置一下hadoop01這個主機名到IP的映射關係接下來咱們嘗試一下提交做業到 YARN 上運行,首先準備好官方提供的測試文件,並put到HDFS中:
[root@hadoop01 ~]# wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt [root@hadoop01 ~]# hadoop fs -copyFromLocal LICENSE-2.0.txt /
而後執行以下命令,提交一個Word Count做業:
[root@hadoop01 ~]# cd /usr/local/flink/ [root@hadoop01 /usr/local/flink]# ./bin/flink run -m hadoop01:37525 ./examples/batch/WordCount.jar \ --input hdfs://hadoop01:8020/LICENSE-2.0.txt --output hdfs://hadoop01:8020/wordcount-result.txt
hadoop01:37525
,是執行完yarn-session.sh
命令輸出的JobManager的訪問地址執行完成後,控制檯會輸出以下內容:
Job has been submitted with JobID 2240e11994cf8579a78e16a1984f08db Program execution finished Job with JobID 2240e11994cf8579a78e16a1984f08db has finished. Job Runtime: 10376 ms
此時到「Completed Jobs」頁面中,能夠看到運行完成的做業及其信息:
除此以外,咱們還能夠查看該做業輸出到HDFS中的結果文件:
[root@hadoop01 /usr/local/flink]# hadoop fs -ls /wordcount-result.txt -rw-r--r-- 1 root supergroup 4499 2020-09-29 20:25 /wordcount-result.txt [root@hadoop01 /usr/local/flink]# hadoop fs -text /wordcount-result.txt
首先將以前在 yarn 上運行的應用和相關進程給kill
掉:
[root@hadoop01 ~]# yarn application -kill application_1601372571363_0001 [root@hadoop01 ~]# jps 6995 SecondaryNameNode 7204 ResourceManager 7305 NodeManager 11291 Jps 6734 NameNode 6830 DataNode 8942 FlinkYarnSessionCli [root@hadoop01 ~]# kill 8942
Per-Job模式更簡單,由於是提交一個做業就建立一次資源的,因此直接運行以下命令就能夠提交一個Flink的Word Count做業到 yarn 上,不須要像Session模式那樣事先去建立資源:
[root@hadoop01 /usr/local/flink]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
做業運行完成後,控制檯會輸出一堆統計結果。此時在 yarn 上能夠看到該做業已經執行完成:
在以前的演示中能夠看到,提交的Flink做業都是以jar
包形式存在的。若是咱們在實際開發中,須要頻繁修改代碼提交到 yarn 上測試,那麼就得頻繁的打包,相對來講就有點麻煩。那麼Flink有沒有像Spark那樣提供相似於 Spark Shell 的交互式編程終端用於簡單的代碼測試呢?答案是有的,Flink提供了PyFlink Shell和Scala Shell,能夠執行Python和Scala代碼。
這裏簡單演示下Flink Scala Shell的使用,執行以下命令打開Flink Scala Shell:
[root@hadoop01 /usr/local/flink]# ./bin/start-scala-shell.sh local
local
表示在本地運行,除此以外還能夠選擇remote
和yarn
,具體可使用--help
參數進行查看shell裏調用API的方式仍是同樣的,只是環境變成了內置的變量,例如這裏使用的benv
就表示批處理的env:
scala> val dataSet = benv.readTextFile("file:///root/LICENSE-2.0.txt") dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@3110bb19 scala> dataSet.print