PySpark on Yarn 的 Python 環境與包依賴解決方式

一、問題

Spark on Yarn是將yarn做爲ClusterManager的運行模式,Spark會將資源(container)的管理與協調統一交給yarn去處理。html

Spark on Yarn分爲client/cluster模式:
對於client模式,Spark程序的Driver/SparkContext實例用戶提交機上,該機器能夠位於yarn集羣以內或以外,只須要起能正常與ResourceManager通訊及正確配置HADOOP_CONF_DIR或YARN_CONF_DIR環境變量指向yarn集羣。生產環境中,一般提交機不會是yarn集羣內部的節點,手握配置權限的狀況下,能夠按需配置支撐Spark程序須要的軟件、環境、文件等。
對於cluster模式,Spark程序的Driver/SparkContext實例位於ApplicationMaster(am)中,am做爲一個container能夠起在yarn集羣中任何一個NodeManager上,默認狀況下,咱們就須要爲全部的節點機器準備好Spark程序須要的全部運行環境。java

Python提供了很是豐富的數學運算、機器學習處理庫——如numpypandasscipy等等。愈來愈多的同事但願利用這些高效的庫開發各類算法而後以PySpark程序跑到咱們的Spark上。python

對於scala/java寫的Spark程序,咱們能夠將咱們所依賴的jar一塊兒與咱們的main函數所在的主程序打成一個fat jar,經過spark-submit提交後,這些依賴就會經過Yarn的Distribute Cache分發到全部節點支撐運行。
對於python寫的Spark程序若是有外部依賴就很尷尬了,python自己就是兩種語言,在全部NodeManager節點上安裝你全部須要的依賴對於IT運維人員也是一個很是痛苦的事情。linux

參考官方文檔算法

For Python, you can use the --py-files argument of spark-submit to add .py, .zip or .egg
files to be distributed with your application. If you depend on multiple Python files we recommend
packaging them into a .zip or .egg.apache

--py-files,能夠解決部分依賴的問題,但對於有些場景就可能不是很方便,或者不可能實現。bash

  • 依賴太多,包括傳遞依賴
  • python包在deploy前須要依賴的C代碼提早編譯
  • 基於不一樣版本的python的pyspark跑在同一個yarn集羣上

對於這些問題 ,社區也有相關的討論,詳細能夠看下 這個ticket https://issues.apache.org/jira/browse/SPARK-13587網絡

二、原理

pyspark原理的資料比較少,能夠看下wikiapp

https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals運維

能夠看下上面連接中的圖,圖中左右分爲driver/executor, 圖白色和綠色分python和java,能夠看到無論PySpark適宜client仍是cluster模式跑在yarn上,driver和executor端都有python的進程起着,這就須要集羣中的全部節點都有相應的python依賴環境。

三、方案

從靈活性的角度來說,這裏從前輩的討論中總結一下,提供一種在運行時建立python運行及相關依賴的辦法

一、下載並安裝anaconda
https://www.anaconda.com/download/#linux

二、安裝anaconda

sh Anaconda2-5.0.1-Linux-x86_64.sh

三、建立須要的依賴環境 conda create

/home/username/anaconda3/bin/conda create --prefix=/home/username/name1/tools/anaconda2/envs/projname_py36 python==3.6
# 查當作功的環境
$ conda env list |grep projname_py36

第一次根據網絡狀況下載上述這些依賴,可能會比較久,之後就會快不少。

du -sh  projname_py36
847M    projname_py36

能夠看到依賴包整個大小仍是挺大的,對於一些實時性比較高的場景這種方式其實不太有利,有些不須要的依賴在建立的時候能夠不打進去。固然咱們還須要zip壓縮一下,能夠減少部分網絡開銷。固然若是咱們把這個環境直接提早put到hdfs,也就沒有這個問題了。

# 附:若是後續每次都要使用這個 conda projname_py36 環境,能夠作成自動加載 conda 配置並初始化
#!/usr/bin/env bash
# set_conda_env.sh
export CONDA_HOME=/home/username/anaconda3
export JAVA_HOME=/opt/soft/jdk/jdk1.7.0_40
export JRE_HOME=/opt/soft/jdk/jdk1.7.0_40/jre
export HAADOOP_HOME=/usr/lib/software/hadoop
export SPARK_HOME=/usr/lib/software/spark/spark-2.1
export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH:$CONDA_HOME/bin/conda 
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH

export PATH=/home/username/anaconda3/bin:$PATH
source activate base
conda deactivate
conda activate /home/username/ooxx/tools/anaconda2/envs/projname_py36

export PYTHONUTF8=1

而後執行:
. set_conda_env.sh

四、環境與依賴總體打包上傳 HDFS

zip -r projname_py36_env.zip ./projname_py36/
#unzip -t projname_py36_env.zip|grep bin/python
#    testing: projname_py36/bin/python3   OK
#    testing: projname_py36/bin/python3.6m   OK
#    testing: projname_py36/bin/python3.6   OK
#    testing: projname_py36/bin/python    OK
#    testing: projname_py36/bin/python3-config   OK
#    testing: projname_py36/bin/python3.6m-config   OK
#    testing: projname_py36/bin/python3.6-config   OK
hadoop fs -put projname_py36_env.zip /tmp/hadoop-username/projname/
hadoop fs -ls /tmp/hadoop-username/projname/
rm -rf projname_py36_env.zip

這樣咱們就能夠經過 --archives  hdfs://hdp-66-cluster/tmp/hadoop-username/projname/projname_py36_env.zip#PyEnv 的方式將python及其依賴環境上傳並分發到spark各個進程的working dir。

四、測試

爲了節約時間,直接從spark示例代碼裏拷一個出來測試,而且以 cluster 模式提交:

# correlations_example1.py
from __future__ import print_function

import numpy as np

from pyspark import SparkContext
# $example on$
from pyspark.mllib.stat import Statistics
# $example off$

if __name__ == "__main__":
    sc = SparkContext(appName="CorrelationsExample")  # SparkContext

    # $example on$
    seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0])  # a series
    # seriesY must have the same number of partitions and cardinality as seriesX
    seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0])

    # Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method.
    # If a method is not specified, Pearson's method will be used by default.
    print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))

    data = sc.parallelize(
        [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])]
    )  # an RDD of Vectors

    # calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
    # If a method is not specified, Pearson's method will be used by default.
    print(Statistics.corr(data, method="pearson"))
    rdd = sc.parallelize([str(Statistics.corr(data, method="pearson"))])
    rdd.saveAsTextFile("hdfs://hdp-66-cluster/tmp/username/name2/correlations_example.txt")
    # $example off$

    sc.stop()
# 完整 cluster 提交流程
(1)
cp /usr/lib/software/spark/spark-2.1/examples/src/main/python/mllib/correlations_example.py ~/projname/correlations_example1.py

(2)
/usr/lib/software/spark/spark-2.3.2-bin-U1.1/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue root.spark.username.spark \
--num-executors 8 \
--executor-cores 1 \
--archives  hdfs://hdp-66-cluster/tmp/hadoop-username/projname/projname_py36_env.zip#PyEnv \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=PyEnv/projname_py36/bin/python \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=PyEnv/projname_py36/bin/python \
--conf spark.executorEnv.PYSPARK_PYTHON=PyEnv/projname_py36/bin/python \
--conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=PyEnv/projname_py36/bin/python \
#--py-files a.py,b.py,c.py \
~/projname/correlations_example1.py

#--archives後面的參數默認是找本地路徑的文件,只有加上hdfs://host:port/path纔會找hdfs上的文件

(3)
hadoop fs -cat hdfs://hdp-66-cluster/tmp/hadoop-username/projname/correlations_example.txt/*

Refer

[1] PySpark on Yarn的相關依賴的解決方式

https://www.jianshu.com/p/df0a189ff28b

相關文章
相關標籤/搜索