spark程序的提交是經過spark-submit腳本實現的,咱們從它開始一步步解開spark提交集羣的步驟。apache
spark-submit的主要命令行:exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"bash
是執行spark-class腳本,並將spark.deploy.SparkSubmit類做爲第一個參數。app
一、 spark-classide
最關鍵的就是下面這句了:ui
CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") exec "${CMD[@]}"
首先循環讀取ARG參數,加入到CMD中。而後執行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@
這個是真正執行的第一個spark的類。spa
該類在launcher模塊下,簡單的瀏覽下代碼:命令行
public static void main(String[] argsArray) throws Exception { ... List<String> args = new ArrayList<String>(Arrays.asList(argsArray)); String className = args.remove(0); ... //建立命令解析器 AbstractCommandBuilder builder; if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { ... } } else { builder = new SparkClassCommandBuilder(className, args); } List<String> cmd = builder.buildCommand(env);//解析器解析參數 ... //返回有效的參數 if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }
launcher.Main
返回的數據存儲到CMD中。code
而後執行命令:orm
exec "${CMD[@]}"
這裏開始真正執行某個Spark的類。rem
二、 deploy.SparkSubmit類
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(args, uninitLog) } }) } catch { 。。。 } } else { runMain(args, uninitLog) } } doRunMain() }
主要是經過runMain(args,unititLog)方法來提價spark jar包。
因此必須先看看runMain方法是幹什麼的:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { ... } val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] } else { new JavaMainApplication(mainClass) } try { app.start(childArgs.toArray, sparkConf) } catch { ... } }
這就很清楚了,要作的事情有如下這些:獲取類加載器,添加jar包依賴。建立SparkApplication類的可執行程序或者是JavaMainApplication,建立出來的類叫app。最後執行app.start方法。
SparkApplication是一個抽象類,咱們就看看默認的JavaMainApplication就能夠了,代碼很是簡單:
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) } }
就是一個kclass的封裝器,用來執行入參的kclass的main方法。這裏的kclass就是咱們編寫的spark程序了,裏面總有個main方法的。
這個只是個大概,不少的奧祕還在launcher這個工程裏。後續筆者試圖詳細解開launcher的奧祕。