研究 Spark 內部是怎麼運行的,怎麼將 Spark 的任務從開始運行到結束的,先從 spark-submit 這個 shell 腳本提交用戶程序開始。下面的分析都是基於 spark 2.1.1 版本。java
咱們通常提交 Spark 任務時,都會寫一個以下的腳本,裏面指定 spark-submit 腳本的位置,配置好一些參數,而後運行:shell
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
複製代碼
上面那個腳本實際上會將參數帶到 spark-submit 腳本中去執行,看一下 spark-submit 腳本:apache
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi # disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
複製代碼
腳本最後調用 exec 執行 "${SPARK_HOME}"/bin/spark-class,調用的 class 爲:org.apache.spark.deploy.SparkSubmit
,後面的 "$@" 是腳本執行的全部參數。服務器
經過 spark-class 腳本,最終執行的命令中,制定了程序的入口爲org.apache.spark.deploy.SparkSubmit
。app
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
複製代碼
從 main 方法中能夠看出,根據解析後的參數中的 action 進行模式匹配,是什麼操做就執行什麼方法,咱們這邊是 submit 操做,則調用 submit 方法。dom
submit 方法作兩件事情,第一件事爲經過 clusterManager 和 dploymode 去決定下一步要執行的類的 main 方法,第二件事是根據反射執行這個 main 方法。ide
這部分主要是準備下一步要執行的相關類及參數:oop
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
複製代碼
經過調用 prepareSubmitEnvironment
方法來準備下一步要執行的類的 main 方法及相關參數,看一下這個方法,下面這部分是根據參數中的 master 和 deploy-mode 來設置對應的 cluasterManager 和部署模式:ui
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) : (Seq[String], Seq[String], Map[String, String], String) = {
// 要返回的四個參數
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""
// 根據腳本中配置的 master 參數去模式匹配出 clusterManager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
-1
}
// 根據 deployMode 參數去模式匹配出部署模式
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}
複製代碼
而後會根據上面匹配出的集羣以及部署模式決定怎麼提交 application,咱們這邊看一下 standalone 集羣部署模式,看下面這部分代碼:lua
// standalone cluster 模式下的 childMainClass 以及參數的配置
if (args.isStandaloneCluster) {
//若是參數中配置了 useRest 則爲 RestSubmissionClient 的方式去提交 application
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// 不然使用 Client 放是去提交 application
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
複製代碼
在 standalone 集羣模式下,有兩個提交網關:
1,使用 org.apache.spark.deploy.Client
做爲包裝器來使用傳統的 RPC 網關;
2,使用 Spark 1.3 中引入的基於 rest 的網關。
這裏咱們的參數已經準備好了,而後根據咱們 standalone cluster 部署模式決定下一步怎麼執行:
/* 在standalone集羣模式下,有兩個提交網關: * 1.使用org.apache.spark.deploy.Client做爲包裝器來使用傳統的RPC網關 * 2.Spark 1.3中引入的基於rest的網關 * 第二種方法是Spark 1.3的默認行爲,可是Spark submit將會失敗 * 若是master不是一個REST服務器,那麼它將沒法使用REST網關。 */
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false submit(args) } } else {
// 其餘模式,直接調用doRunMain方法
doRunMain()
}
複製代碼
接着會調用到 doRunMain 方法,內部其實調用了 runMain 方法,因此咱們直接看 runMain 方法。
//實際上這個方法就是根據咱們上面 prepareSubmitEnvironment 方法準備好的參數,經過反射的方法去執行咱們
//下一步要執行的類及方法
private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
//經過反射去執行準備好的 mainClass 的 main 方法
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
複製代碼
咱們選取的 standalone cluster 模式去分析的,根據上面的 prepareSubmitEnvironment 方法能夠知道咱們要使用 org.apache.spark.deploy.Client
這個 childMainClass,而後根據上面的代碼知道,咱們下一步是將相關參數帶進 org.apache.spark.deploy.Client
這個類的 main 方法中去執行。
因此下面開始看 org.apache.spark.deploy.Client
Client 用於啓動和終止 standalone 集羣中的 Driver 程序。
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//建立 rpcEnv
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
//獲取 master 節點的 RpcEndPoint 的引用,用於和 master 進行 Rpc 通訊
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//註冊 rpcEndpoint,調用 onStart方法
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
//
rpcEnv.awaitTermination()
}
複製代碼
這裏開始建立 rpcEnv 了,關於 Rpc 這塊的知識點,能夠看前面這篇文章瞭解一下:Spark 中的 RPC,拿到 master 的 rpcEndpoint 的引用去註冊 rpcEndpoint,這裏會去調用 ClientEndpoint 的 onstart 方法。
ClientEndPoint 是一個 ThreadSafeRpcEndpoint,下面看下它的 onStart 方法。
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
// 將classPathEntries,libraryPathEntries,javaOpts,drvierArgs信息封裝成Command
// 這裏的mainClass爲org.apache.spark.deploy.worker.DriverWrapper
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
// 將drvierArgs,command信息封裝成DriverDescription
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
// 向master發送RequestSubmitDriver,註冊Driver
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}
複製代碼
這裏也會根據 cmd 進行模式匹配,,若是命令爲 launch,就去獲取 driver 的額外的 java 依賴,classpath,java 配置。而後將這些信息封裝爲一個 Command 對象,再降 driver 的參數和 command 信息一塊兒封裝成 DriverDescription 對象,調用 ayncSendToMasterAndForwardReply 發送信息。
private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
for (masterEndpoint <- masterEndpoints) {
masterEndpoint.ask[T](message).onComplete {
case Success(v) => self.send(v)
case Failure(e) =>
logWarning(s"Error sending messages to master $masterEndpoint", e)
}(forwardMessageExecutionContext)
}
}
複製代碼
這個方法實際上就是將信息發送到 masterEndpoint 上去。
至此,咱們整個 spark-submit 任務提交就完成了,接下來就是等待 master 返回 driver 的註冊結果,啓動 driver。
最後能夠看一下 spark-submit 過程的流程圖: