Spark2.1.1java
提交命令:sql
spark-submit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jarapache
進程:bash
hadoop 225653 0.0 0.0 11256 364 ? S Aug24 0:00 bash /$spark-dir/bin/spark-class org.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jarapp
hadoop 225654 0.0 0.0 34424 2860 ? Sl Aug24 0:00 /$jdk_dir/bin/java -Xmx128m -cp /spark-dir/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jardom
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fioop# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0uiexec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"this
註釋:這裏執行了另外一個腳本spark-class,具體以下:spa
...
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")...
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
註釋:這裏執行java class: org.apache.spark.launcher.Main,並傳入參數,具體以下:
... builder = new SparkSubmitCommandBuilder(help); ... List<String> cmd = builder.buildCommand(env); ... List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } ...
註釋:其中會調用SparkSubmitCommandBuilder來生成Spark Submit命令,具體以下:
... private List<String> buildSparkSubmitCommand(Map<String, String> env) ... addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS")); ... String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); ... if (isClientMode) { ... addOptionString(cmd, driverExtraJavaOptions); ... } ... addPermGenSizeOpt(cmd); cmd.add("org.apache.spark.deploy.SparkSubmit"); cmd.addAll(buildSparkSubmitArgs()); return cmd; ...
註釋:這裏建立了本地命令,其中java class:org.apache.spark.deploy.SparkSubmit,同時會把各類JavaOptions放到啓動命令裏(好比SPARK_JAVA_OPTS,DRIVER_EXTRA_JAVA_OPTIONS等),具體以下:
def main(args: Array[String]): Unit = { val appArgs = new SparkSubmitArguments(args) //parse command line parameter if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } } private def submit(args: SparkSubmitArguments): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) //merge all parameters from: command line, properties file, system property, etc... def doRunMain(): Unit = { ... runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) ... } ... private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) : (Seq[String], Seq[String], Map[String, String], String) = { if (deployMode == CLIENT || isYarnCluster) { childMainClass = args.mainClass ... if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" ... private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = { // scalastyle:off println if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") printStream.println(s"System properties:\n${sysProps.mkString("\n")}") printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") printStream.println("\n") } // scalastyle:on println val loader = if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } else { new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } Thread.currentThread.setContextClassLoader(loader) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } for ((key, value) <- sysProps) { System.setProperty(key, value) } var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { ... val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) ... mainMethod.invoke(null, childArgs.toArray) ...
註釋:這裏首先會解析命令行參數,好比mainClass,準備運行環境包括System Property以及classpath等,而後使用一個新的classloader:ChildFirstURLClassLoader來加載用戶的mainClass,而後反射調用mainClass的main方法,這樣用戶的app.package.AppClass的main方法就開始執行了。
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable { import SparkConf._ /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) ... if (loadDefaults) { loadFromSystemProperties(false) } private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = { // Load any spark.* system properties for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { set(key, value, silent) } this }
註釋:這裏能夠看到spark是怎樣加載配置的
spark-submit --master local[*] --class app.package.AppClass --jars /$other-dir/other.jar --driver-memory 1g --verbose app-1.0.jar
輸出示例:
Main class:
app.package.AppClass
Arguments:System properties:
spark.executor.logs.rolling.maxSize -> 1073741824
spark.driver.memory -> 1g
spark.driver.extraLibraryPath -> /$hadoop-dir/lib/native
spark.eventLog.enabled -> true
spark.eventLog.compress -> true
spark.executor.logs.rolling.time.interval -> daily
SPARK_SUBMIT -> true
spark.app.name -> app.package.AppClass
spark.driver.extraJavaOptions -> -XX:+PrintGCDetails -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:-UseCompressedClassPointers -XX:CompressedClassSpaceSize=3G -XX:+PrintGCTimeStamps -Xloggc:/export/Logs/hadoop/g1gc.log
spark.jars -> file:/$other-dir/other.jar
spark.sql.adaptive.enabled -> true
spark.submit.deployMode -> client
spark.executor.logs.rolling.maxRetainedFiles -> 10
spark.executor.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
spark.eventLog.dir -> hdfs://myhdfs/spark/history
spark.master -> local[*]
spark.sql.crossJoin.enabled -> true
spark.driver.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
Classpath elements:
file:/$other-dir/other.jar
file:/app-1.0.jar
啓動時添加--verbose參數後,能夠輸出全部的運行時信息,有助於判斷問題。