上一篇主要是介紹了spark啓動的一些腳本,這篇主要分析一下Spark源碼中提交任務腳本的處理邏輯,從spark-submit一步步深刻進去看看任務提交的總體流程,首先看一下總體的流程概要圖:java
2.1 spark-submitpython
# -z是檢查後面變量是否爲空(空則真) shell能夠在雙引號以內引用變量,單引號不可 #這一步做用是檢查SPARK_HOME變量是否爲空,爲空則執行then後面程序 #source命令: source filename做用在當前bash環境下讀取並執行filename中的命令 #$0表明shell腳本文件自己的文件名,這裏即便spark-submit #dirname用於取得腳本文件所在目錄 dirname $0取得當前腳本文件所在目錄 #$(命令)表示返回該命令的結果 #故整個if語句的含義是:若是SPARK_HOME變量沒有設置值,則執行當前目錄下的find-spark-home腳本文件,設置SPARK_HOME值 if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 #執行spark-class腳本,傳遞參數org.apache.spark.deploy.SparkSubmit 和"$@" #這裏$@表示以前spark-submit接收到的所有參數 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
因此spark-submit腳本的總體邏輯就是:
首先 檢查SPARK_HOME是否設置;if 已經設置 執行spark-class文件 不然加載執行find-spark-home文件 shell
#定義一個變量用於後續判斷是否存在定義SPARK_HOME的python腳本文件 FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py" # Short cirtuit if the user already has this set. ##若是SPARK_HOME爲不爲空值,成功退出程序 if [ ! -z "${SPARK_HOME}" ]; then exit 0 # -f用於判斷這個文件是否存在而且是否爲常規文件,是的話爲真,這裏不存在爲假,執行下面語句,給SPARK_HOME變量賦值 elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then # If we are not in the same directory as find_spark_home.py we are not pip installed so we don't # need to search the different Python directories for a Spark installation. # Note only that, if the user has pip installed PySpark but is directly calling pyspark-shell or # spark-submit in another directory we want to use that version of PySpark rather than the # pip installed version of PySpark. export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)" else # We are pip installed, use the Python script to resolve a reasonable SPARK_HOME # Default to standard python interpreter unless told otherwise if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}" fi export SPARK_HOME=$($PYSPARK_DRIVER_PYTHON "$FIND_SPARK_HOME_PYTHON_SCRIPT") fi
能夠看到,若是事先用戶沒有設定SPARK_HOME的值,這裏程序也會自動設置而且將其註冊爲環境變量,供後面程序使用apache
當SPARK_HOME的值設定完成以後,就會執行Spark-class文件,這也是咱們分析的重要部分,源碼以下:bash
#!/usr/bin/env bash #依舊是檢查設置SPARK_HOME的值 if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi #執行load-spark-env.sh腳本文件,主要目的在於加載設定一些變量值 #設定spark-env.sh中的變量值到環境變量中,供後續使用 #設定scala版本變量值 . "${SPARK_HOME}"/bin/load-spark-env.sh # Find the java binary #檢查設定java環境值 #-n表明檢測變量長度是否爲0,不爲0時候爲真 #若是已經安裝Java沒有設置JAVA_HOME,command -v java返回的值爲${JAVA_HOME}/bin/java if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi # Find Spark jars. #-d檢測文件是否爲目錄,若爲目錄則爲真 #設置一些關聯Class文件 if [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars" else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" fi if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark with the target \"package\" before running this program." 1>&2 exit 1 else LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" fi # Add the launcher build dir to the classpath if requested. if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" fi # For tests if [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR fi # The launcher library will print arguments separated by a NULL character, to allow arguments with # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating # an array that will be used to exec the final command. # # The exit code of the launcher is appended to the output, so the parent shell removes it from the # command array and checks the value to see if the launcher succeeded. #執行類文件org.apache.spark.launcher.Main,返回解析後的參數 build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } # Turn off posix mode since it does not allow process substitution #將build_command方法解析後的參數賦給CMD set +o posix CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <(build_command "$@") COUNT=${#CMD[@]} LAST=$((COUNT - 1)) LAUNCHER_EXIT_CODE=${CMD[$LAST]} # Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes # the code that parses the output of the launcher to get confused. In those cases, check if the # exit code is an integer, and if it's not, handle it as a special error case. if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then echo "${CMD[@]}" | head -n-1 1>&2 exit 1 fi if [ $LAUNCHER_EXIT_CODE != 0 ]; then exit $LAUNCHER_EXIT_CODE fi CMD=("${CMD[@]:0:$LAST}") #執行CMD中的某個參數類org.apache.spark.deploy.SparkSubmit exec "${CMD[@]}"
spark-class文件的執行邏輯稍顯複雜,整體上應該是這樣的:app
檢查SPARK_HOME的值----》執行load-spark-env.sh文件,設定一些須要用到的環境變量,如scala環境值,這其中也加載了spark-env.sh文件-------》檢查設定java的執行路徑變量值-------》尋找spark jars,設定一些引用相關類的位置變量------》執行類文件org.apache.spark.launcher.Main,返回解析後的參數給CMD-------》判斷解析參數是否正確(表明了用戶設置的參數是否正確)--------》正確的話執行org.apache.spark.deploy.SparkSubmit這個類less
2.1最後提交語句,D:\src\spark-2.3.0\core\src\main\scala\org\apache\spark\deploy\SparkSubmit.scaladom
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
override def main(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true) //拿到submit腳本傳入的參數 val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } //根據傳入的參數匹配對應的執行方法 appArgs.action match { //根據傳入的參數提交命令 case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) //只有standalone和mesos集羣模式才觸發 case SparkSubmitAction.KILL => kill(appArgs) //只有standalone和mesos集羣模式才觸發 case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
(1)調用prepareSubmitEnvironmentide
(2)調用doRunMainoop