先看看代碼,這個類代碼比較短,目錄是deploy/rest/下。app
private[spark] class RestSubmissionClientApp extends SparkApplication { /** Submits a request to run the application and return the response. Visible for testing. */ def run( appResource: String, mainClass: String, appArgs: Array[String], conf: SparkConf, env: Map[String, String] = Map()): SubmitRestProtocolResponse = { val master = conf.getOption("spark.master").getOrElse { throw new IllegalArgumentException("'spark.master' must be set.") } val sparkProperties = conf.getAll.toMap val client = new RestSubmissionClient(master) val submitRequest = client.constructSubmitRequest( appResource, mainClass, appArgs, sparkProperties, env) client.createSubmission(submitRequest) } override def start(args: Array[String], conf: SparkConf): Unit = { if (args.length < 2) { sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]") sys.exit(1) } val appResource = args(0) val mainClass = args(1) val appArgs = args.slice(2, args.length) val env = RestSubmissionClient.filterSystemEnvironment(sys.env) run(appResource, mainClass, appArgs, conf, env) } }
建立一個RestSubmissionClient的client,而後將消息提交給client,消息的格式爲:ide
( appResource, mainClass, appArgs, sparkProperties, env)函數
client.createSubmission(submitRequest)ui
client.createSubmission命令作哪些事呢?他就是提交消息給服務端,真實的處理者是服務端,是RestSubmissionServer類或者它的子類。對於獨立集羣來講,就是StandaloneRestServer來處理的,咱們就只看submit命令的處理邏輯就能夠了。spa
相關的函數有兩個:rest
私有方法buildDriverDescription和重寫接口方法handleSubmitcode
handleSubmit裏調用了前一個方法,最關鍵的代碼是兩行:接口
val driverDescription = buildDriverDescription(submitRequest) val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse]( DeployMessages.RequestSubmitDriver(driverDescription))
生成一個DriverDescription類型的消息,而後給Master發送RequestSubmitDriver消息,讓Master來調度執行咱們的spark程序,就是這裏的driver。ip
接下來,就進入了Master的處理流程了。get