Standalone cluster模式下的RestSubmissionClientApp

先看看代碼,這個類代碼比較短,目錄是deploy/rest/下。app

private[spark] class RestSubmissionClientApp extends SparkApplication {
  /** Submits a request to run the application and return the response. Visible for testing. */
  def run(
      appResource: String,
      mainClass: String,
      appArgs: Array[String],
      conf: SparkConf,
      env: Map[String, String] = Map()): SubmitRestProtocolResponse = {
    val master = conf.getOption("spark.master").getOrElse {
      throw new IllegalArgumentException("'spark.master' must be set.")
    }
    val sparkProperties = conf.getAll.toMap
    val client = new RestSubmissionClient(master)
    val submitRequest = client.constructSubmitRequest(
      appResource, mainClass, appArgs, sparkProperties, env)
    client.createSubmission(submitRequest)
  }

  override def start(args: Array[String], conf: SparkConf): Unit = {
    if (args.length < 2) {
      sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]")
      sys.exit(1)
    }
    val appResource = args(0)
    val mainClass = args(1)
    val appArgs = args.slice(2, args.length)
    val env = RestSubmissionClient.filterSystemEnvironment(sys.env)
    run(appResource, mainClass, appArgs, conf, env)
  }
}

建立一個RestSubmissionClient的client,而後將消息提交給client,消息的格式爲:ide

( appResource, mainClass, appArgs, sparkProperties, env)函數

client.createSubmission(submitRequest)ui

client.createSubmission命令作哪些事呢?他就是提交消息給服務端,真實的處理者是服務端,是RestSubmissionServer類或者它的子類。對於獨立集羣來講,就是StandaloneRestServer來處理的,咱們就只看submit命令的處理邏輯就能夠了。spa

相關的函數有兩個:rest

私有方法buildDriverDescription和重寫接口方法handleSubmitcode

handleSubmit裏調用了前一個方法,最關鍵的代碼是兩行:接口

val driverDescription = buildDriverDescription(submitRequest)
val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
          DeployMessages.RequestSubmitDriver(driverDescription))

生成一個DriverDescription類型的消息,而後給Master發送RequestSubmitDriver消息,讓Master來調度執行咱們的spark程序,就是這裏的driver。ip

接下來,就進入了Master的處理流程了。get

相關文章
相關標籤/搜索