spark-master源碼之schedule

schedule方法的總源碼:數組

 /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule(): Unit = {
//首先判斷,master是alive狀態,如不是,結束
if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors
// Random shuffle的原理,對傳入的集合元素進行隨機的打亂
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0
//首先調度driver,只有用yarn-cluster模式提交的時候,纔會註冊driver。standalone和yarn-client模式,都會在本地直接啓動driver,
//而不會來註冊driver,更不可能讓master調度driver
//driver進行註冊的時候,會把信息放到等待隊列中waitingDrivers
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var isClusterIdle = true var numWorkersVisited = 0
//driver沒有被啓動時運行,而且啓動worker的數量進行限制
while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty numWorkersVisited += 1
//driver啓動須要的條件,例如內存,cpu等資源
if (canLaunchDriver(worker, driver.desc)) { val allocated = worker.acquireResources(driver.desc.resourceReqs) driver.withResources(allocated)
//啓動driver launchDriver(worker, driver)
//從等待隊列中去除當前啓動的driver waitingDrivers
-= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } if (!launched && isClusterIdle) { logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.") } } startExecutorsOnWorkers() }

啓動driver方法的源碼:緩存

  private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
//將driver加入worker內存的緩存結構 worker.addDriver(driver)
//將worker內使用的內存和cpu數量,都加上driver須要的內存和cpu數量 driver.worker
= Some(worker)
//調用worker的actor,給它發送launchDriver消息,讓worker來啓動driver worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources))
//將driver的狀態設置成running driver.state
= DriverState.RUNNING }

啓動workers源碼:app

  /**
   * Schedule and launch executors on workers
   */
  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
//遍歷waitingApps中的ApplicationInfo,而且過濾還有須要調度的core的application
for (app <- waitingApps) { val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) // If the cores left is less than the coresPerExecutor,the cores left will not be allocated if (app.coresLeft >= coresPerExecutor) { // Filter out workers that don't have enough resources to launch an executor
//從workers中,過濾出狀態爲ALIVE的,再次過濾能夠被application使用的worker,而後安裝cpu的數量進行倒排
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canLaunchExecutor(_, app.desc)) .sortBy(_.coresFree).reverse if (waitingApps.length == 1 && usableWorkers.isEmpty) { logWarning(s"App ${app.id} requires more resource than any of Workers could have.") }
//給每一個worker分配多少個cores(cpu數量),數組 val assignedCores
= scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them
//分配worker和cpu數量。要啓動的executor,平均分配到各個worker上去。
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) } } } }

擴展:中華石杉-spark從入門到精通,第48講less

 

要理解以上源碼,須要知道其中的關係:dom

spark一個集羣會有多個master節點和多個worker節點,master節點負責管理worker節點,worker節點管理Excetor。ide

一個worker節點包含多個Excetor,每一個Excetor多個cpu core和必定memory。ui

 

擴展閱讀:worker,Excetor,CPU core之間的關係。this

相關文章
相關標籤/搜索