Spark on Yarn是將yarn做爲ClusterManager的運行模式,Spark會將資源(container)的管理與協調統一交給yarn去處理。html
Spark on Yarn分爲client/cluster模式:
對於client模式,Spark程序的Driver/SparkContext實例用戶提交機上,該機器能夠位於yarn集羣以內或以外,只須要起能正常與ResourceManager通訊及正確配置HADOOP_CONF_DIR或YARN_CONF_DIR環境變量指向yarn集羣。生產環境中,一般提交機不會是yarn集羣內部的節點,手握配置權限的狀況下,能夠按需配置支撐Spark程序須要的軟件、環境、文件等。
對於cluster模式,Spark程序的Driver/SparkContext實例位於ApplicationMaster(am)中,am做爲一個container能夠起在yarn集羣中任何一個NodeManager上,默認狀況下,咱們就須要爲全部的節點機器準備好Spark程序須要的全部運行環境。java
Python提供了很是豐富的數學運算、機器學習處理庫——如numpy、pandas、scipy等等。愈來愈多的同事但願利用這些高效的庫開發各類算法而後以PySpark程序跑到咱們的Spark上。python
對於scala/java寫的Spark程序,咱們能夠將咱們所依賴的jar一塊兒與咱們的main函數所在的主程序打成一個fat jar,經過spark-submit提交後,這些依賴就會經過Yarn的Distribute Cache分發到全部節點支撐運行。
對於python寫的Spark程序若是有外部依賴就很尷尬了,python自己就是兩種語言,在全部NodeManager節點上安裝你全部須要的依賴對於IT運維人員也是一個很是痛苦的事情。linux
參考官方文檔算法
For Python, you can use the
--py-files
argument ofspark-submit
to add.py
,.zip
or.egg
files to be distributed with your application. If you depend on multiple Python files we recommend
packaging them into a.zip
or.egg
.apache
--py-files,能夠解決部分依賴的問題,但對於有些場景就可能不是很方便,或者不可能實現。bash
對於這些問題 ,社區也有相關的討論,詳細能夠看下 這個ticket https://issues.apache.org/jira/browse/SPARK-13587網絡
pyspark原理的資料比較少,能夠看下wikiapp
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals運維
能夠看下上面連接中的圖,圖中左右分爲driver/executor, 圖白色和綠色分python和java,能夠看到無論PySpark適宜client仍是cluster模式跑在yarn上,driver和executor端都有python的進程起着,這就須要集羣中的全部節點都有相應的python依賴環境。
從靈活性的角度來說,這裏從前輩的討論中總結一下,提供一種在運行時建立python運行及相關依賴的辦法
一、下載並安裝anaconda
https://www.anaconda.com/download/#linux
二、安裝anaconda
sh Anaconda2-5.0.1-Linux-x86_64.sh
三、建立須要的依賴環境 conda create
/home/username/anaconda3/bin/conda create --prefix=/home/username/name1/tools/anaconda2/envs/projname_py36 python==3.6 # 查當作功的環境 $ conda env list |grep projname_py36
第一次根據網絡狀況下載上述這些依賴,可能會比較久,之後就會快不少。
du -sh projname_py36 847M projname_py36
能夠看到依賴包整個大小仍是挺大的,對於一些實時性比較高的場景這種方式其實不太有利,有些不須要的依賴在建立的時候能夠不打進去。固然咱們還須要zip壓縮一下,能夠減少部分網絡開銷。固然若是咱們把這個環境直接提早put到hdfs,也就沒有這個問題了。
# 附:若是後續每次都要使用這個 conda projname_py36 環境,能夠作成自動加載 conda 配置並初始化 #!/usr/bin/env bash # set_conda_env.sh export CONDA_HOME=/home/username/anaconda3 export JAVA_HOME=/opt/soft/jdk/jdk1.7.0_40 export JRE_HOME=/opt/soft/jdk/jdk1.7.0_40/jre export HAADOOP_HOME=/usr/lib/software/hadoop export SPARK_HOME=/usr/lib/software/spark/spark-2.1 export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH:$CONDA_HOME/bin/conda export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH export PATH=/home/username/anaconda3/bin:$PATH source activate base conda deactivate conda activate /home/username/ooxx/tools/anaconda2/envs/projname_py36 export PYTHONUTF8=1 而後執行: . set_conda_env.sh
四、環境與依賴總體打包上傳 HDFS
zip -r projname_py36_env.zip ./projname_py36/ #unzip -t projname_py36_env.zip|grep bin/python # testing: projname_py36/bin/python3 OK # testing: projname_py36/bin/python3.6m OK # testing: projname_py36/bin/python3.6 OK # testing: projname_py36/bin/python OK # testing: projname_py36/bin/python3-config OK # testing: projname_py36/bin/python3.6m-config OK # testing: projname_py36/bin/python3.6-config OK hadoop fs -put projname_py36_env.zip /tmp/hadoop-username/projname/ hadoop fs -ls /tmp/hadoop-username/projname/ rm -rf projname_py36_env.zip
這樣咱們就能夠經過 --archives hdfs://hdp-66-cluster/tmp/hadoop-username/projname/projname_py36_env.zip#PyEnv 的方式將python及其依賴環境上傳並分發到spark各個進程的working dir。
爲了節約時間,直接從spark示例代碼裏拷一個出來測試,而且以 cluster 模式提交:
# correlations_example1.py from __future__ import print_function import numpy as np from pyspark import SparkContext # $example on$ from pyspark.mllib.stat import Statistics # $example off$ if __name__ == "__main__": sc = SparkContext(appName="CorrelationsExample") # SparkContext # $example on$ seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0]) # a series # seriesY must have the same number of partitions and cardinality as seriesX seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0]) # Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. # If a method is not specified, Pearson's method will be used by default. print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson"))) data = sc.parallelize( [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])] ) # an RDD of Vectors # calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method. # If a method is not specified, Pearson's method will be used by default. print(Statistics.corr(data, method="pearson")) rdd = sc.parallelize([str(Statistics.corr(data, method="pearson"))]) rdd.saveAsTextFile("hdfs://hdp-66-cluster/tmp/username/name2/correlations_example.txt") # $example off$ sc.stop()
# 完整 cluster 提交流程 (1) cp /usr/lib/software/spark/spark-2.1/examples/src/main/python/mllib/correlations_example.py ~/projname/correlations_example1.py (2) /usr/lib/software/spark/spark-2.3.2-bin-U1.1/bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --queue root.spark.username.spark \ --num-executors 8 \ --executor-cores 1 \ --archives hdfs://hdp-66-cluster/tmp/hadoop-username/projname/projname_py36_env.zip#PyEnv \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=PyEnv/projname_py36/bin/python \ --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=PyEnv/projname_py36/bin/python \ --conf spark.executorEnv.PYSPARK_PYTHON=PyEnv/projname_py36/bin/python \ --conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=PyEnv/projname_py36/bin/python \ #--py-files a.py,b.py,c.py \ ~/projname/correlations_example1.py #--archives後面的參數默認是找本地路徑的文件,只有加上hdfs://host:port/path纔會找hdfs上的文件 (3) hadoop fs -cat hdfs://hdp-66-cluster/tmp/hadoop-username/projname/correlations_example.txt/*
[1] PySpark on Yarn的相關依賴的解決方式