Spark-submit腳本

spark程序的提交是經過spark-submit腳本實現的,咱們從它開始一步步解開spark提交集羣的步驟。apache

spark-submit的主要命令行:exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"bash

是執行spark-class腳本,並將spark.deploy.SparkSubmit類做爲第一個參數。app

一、 spark-classide

最關鍵的就是下面這句了:ui

CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

首先循環讀取ARG參數,加入到CMD中。而後執行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@ 這個是真正執行的第一個spark的類。spa

該類在launcher模塊下,簡單的瀏覽下代碼:命令行

public static void main(String[] argsArray) throws Exception {
   ...
    List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
    String className = args.remove(0);
    ...
    //建立命令解析器
    AbstractCommandBuilder builder;
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        ...
      }
    } else {
      builder = new SparkClassCommandBuilder(className, args);
    }

    List<String> cmd = builder.buildCommand(env);//解析器解析參數
    ...
    //返回有效的參數
    if (isWindows()) {
      System.out.println(prepareWindowsCommand(cmd, env));
    } else {
      List<String> bashCmd = prepareBashCommand(cmd, env);
      for (String c : bashCmd) {
        System.out.print(c);
        System.out.print('\0');
      }
    }
  }

launcher.Main返回的數據存儲到CMD中。code

而後執行命令:orm

exec "${CMD[@]}"

這裏開始真正執行某個Spark的類。rem

二、 deploy.SparkSubmit類

private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(args, uninitLog)
            }
          })
        } catch {
          。。。
        }
      } else {
        runMain(args, uninitLog)
      }
    }    
      doRunMain()
  }

主要是經過runMain(args,unititLog)方法來提價spark jar包。

因此必須先看看runMain方法是幹什麼的:

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
        
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      ...
    }

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }    

    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      ...
    }
  }

這就很清楚了,要作的事情有如下這些:獲取類加載器,添加jar包依賴。建立SparkApplication類的可執行程序或者是JavaMainApplication,建立出來的類叫app。最後執行app.start方法。

SparkApplication是一個抽象類,咱們就看看默認的JavaMainApplication就能夠了,代碼很是簡單:

private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
  override def start(args: Array[String], conf: SparkConf): Unit = {
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    val sysProps = conf.getAll.toMap
    sysProps.foreach { case (k, v) =>
      sys.props(k) = v
    }
    mainMethod.invoke(null, args)
  }

}

就是一個kclass的封裝器,用來執行入參的kclass的main方法。這裏的kclass就是咱們編寫的spark程序了,裏面總有個main方法的。

這個只是個大概,不少的奧祕還在launcher這個工程裏。後續筆者試圖詳細解開launcher的奧祕。

相關文章
相關標籤/搜索