Spark Executor Driver資源調度彙總

  1、簡介

  於Worker Actor於,每次LaunchExecutor這將建立一個CoarseGrainedExecutorBackend流程。Executor和CoarseGrainedExecutorBackend是1對1的關係。也就是說集羣裏啓動多少Executor實例就有多少CoarseGrainedExecutorBackend進程
html

  那麼到底是怎樣分配Executor的呢?怎麼控制調節Executor的個數呢?java

 2、Driver和Executor資源調度

   如下主要介紹一下Spark Executor分配策略:node

   咱們僅看。當Application提交註冊到Master後,Master會返回RegisteredApplication,以後便會調用schedule()這種方法,來分配Driver的資源。和啓動Executor的資源。git

schedule()方法是來調度當前可用資源的調度方法,它管理還在排隊等待的Apps資源的分配。這種方法是每次在集羣資源發生變更的時候都會調用,依據當前集羣最新的資源來進行Apps的資源分配。github

Driver資源調度:

  隨機的將Driver分配到空暇的Worker上去,具體流程請看我寫的凝視 :)

    // First schedule drivers, they take strict precedence over applications
    val shuffledWorkers = Random.shuffle(workers) // 把當前workers這個HashSet的順序隨機打亂
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍歷活着的workers
      for (driver <- waitingDrivers) { //在等待隊列中的Driver們會進行資源分配
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //當前的worker內存和cpu均大於當前driver請求的mem和cpu。則啓動
          launchDriver(worker, driver) //啓動Driver 內部實現是發送啓動Driver命令給指定Worker。Worker來啓動Driver。
          waitingDrivers -= driver //把啓動過的Driver從隊列移除
        }
      }
    }


Executor資源調度:

 Spark默認提供了一種在各個節點進行round-robin的調度,用戶可以本身設置這個flag
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)

在介紹以前咱們先介紹一個概念,
可用的Worker:什麼是可用,可用就是 資源空暇足夠且知足必定的 規則來啓動當前App的Executor。

Spark定義了一個canUse方法:這種方法接受一個ApplicationInfo的描寫敘述信息和當前Worker的描寫敘述信息。
一、 當前worker的空暇內存 該app在每個slave要佔用的內存 (executor.memory默認512M)  
二、當前app從未在此worker啓動過App
總結: 從這點看出。要知足:該Worker的當前可用最小內存要比配置的executor內存大,並且對於同一個App僅僅能在一個Worker裏啓動一個Exeutor。假設要啓動第二個Executor。那麼請到其餘Worker裏。

這種纔算是對App可用的Worker。apache

  /**
   * Can an app use the given worker?

True if the worker has enough memory and we haven't already * launched an executor for the app on it (right now the standalone backend doesn't like having * two executors on the same worker). */ def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) }數組

SpreadOut分配策略:

SpreadOut分配策略是一種以round-robin方式遍歷集羣所有可用Worker。分配Worker資源,來啓動建立Executor的策略。優勢是儘量的將cores分配到各個節點,最大化負載均衡和高並行。

如下看看,默認的spreadOutApps模式啓動App的過程: 

 一、等待分配資源的apps隊列默認是FIFO的。
 二、app.coresLeft表示的是該app還有cpu資源沒申請到:   app.coresLeft  = 當前app申請的maxcpus - granted的cpus
 三、遍歷未分配全然的apps,繼續給它們分配資源,
 四、usableWorkers =  從當前ALIVE的Workers中過濾找出上文描寫敘述的可用Worker。而後依據cpus的資源空暇,從大到小給Workers排序。
 五、當toAssign(即將要分配的的core數>0,就找到可以的Worker持續分配)
 六、當可用Worker的free cores 大於 眼下該Worker已經分配的core時,再給它分配1個core,這樣分配是很是平均的方法。
 七、round-robin輪詢可用的Worker循環
 八、toAssign=0時結束循環。開始依據分配策略去真正的啓動Executor。


舉例: 1個APP申請了6個core, 現在有2個Worker可用。
      那麼: toAssign = 6,assigned = 2 
 那麼就會在assigned(1)和assigned(0)中輪詢平均分配cores,以+1 core的方式,終於每個Worker分到3個core。即每個Worker的啓動一個Executor。每個Executor得到3個cores。
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) { //對還未被全然分配資源的apps處理
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse //依據core Free對可用Worker進行降序排序。
        val numUsable = usableWorkers.length //可用worker的個數 eg:可用5個worker
        val assigned = new Array[Int](numUsable) //候選Worker,每個Worker一個下標,是一個數組,初始化默認都是0
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//還要分配的cores = 集羣中可用Worker的可用cores總和(10)。 當前未分配core(5)中找最小的
        var pos = 0
        while (toAssign > 0) { 
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker裏推斷當前worker空暇cpu是否大於當前數組已經分配core值
            toAssign -= 1
            assigned(pos) += 1 //當前下標pos的Worker分配1個core +1
          }
          pos = (pos + 1) % numUsable //round-robin輪詢尋找有資源的Worker
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) { //假設assigned數組中的值>0,將啓動一個executor在。指定下標的機器上。
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app裏的Executor信息
            launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去啓動Executor
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {

非SpreadOut分配策略:

非SpreadOut策略。該策略:會盡量的依據每個Worker的剩餘資源來啓動Executor,這樣啓動的Executor可能僅僅在集羣的一小部分機器的Worker上。這樣作對node較少的集羣還能夠,集羣規模大了。Executor的並行度和機器負載均衡就不能夠保證了。


當用戶設定了參數 spark.deploy.spreadOutfalse時,觸發此遊戲分支 偷笑,跑個題,有些困了。

markdown

一、遍歷可用Workers
二、且遍歷Apps
三、比較當前Worker的可用core和app還需要分配的core。取最小值當作還需要分配的core
四、假設coreToUse大於0。則直接拿可用的core來啓動Executor。。

奉獻當前Worker全部資源。(Ps:挨個榨乾每個Worker的剩餘資源。。。。app

負載均衡


舉例: App申請12個core,3個Worker。Worker1剩餘1個core, Worke2r剩7個core, Worker3剩餘4個core.
這樣會啓動3個Executor。Executor1 佔用1個core, Executor2佔用7個core, Executor3佔用4個core.
總結:這樣是儘量的知足App,讓其儘快運行,而忽略了其並行效率和負載均衡。
 } else {
      // Pack each app into as few nodes as possible until we've assigned all its cores
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          if (canUse(app, worker)) { //直接問當前worker是有空暇的core
            val coresToUse = math.min(worker.coresFree, app.coresLeft) //有則取。不管多少
            if (coresToUse > 0) { //有
              val exec = app.addExecutor(worker, coresToUse) //直接啓動
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }

3、總結:

 一、 在Worker Actor中。每次LaunchExecutor會建立一個CoarseGrainedExecutorBackend進程,一個Executor相應一個CoarseGrainedExecutorBackend

 二、針對同一個App。每個Worker裏僅僅能有一個針對該App的Executor存在。切記。

假設想讓整個App的Executor變多,設置SPARK_WORKER_INSTANCES。讓Worker變多。

 三、Executor的資源分配有2種策略:

3.一、SpreadOut :一種以round-robin方式遍歷集羣所有可用Worker。分配Worker資源。來啓動建立Executor的策略,優勢是儘量的將cores分配到各個節點。最大化負載均衡和高並行。

3.二、非SpreadOut:會盡量的依據每個Worker的剩餘資源來啓動Executor,這樣啓動的Executor可能僅僅在集羣的一小部分機器的Worker上。這樣作對node較少的集羣還能夠,集羣規模大了。Executor的並行度和機器負載均衡就不能夠保證了。


行文倉促,若有不正之處,請指出,歡迎討論 :)


補充:

一、關於:   一個App一個Worker爲何僅僅有贊成有針對該App的一個Executor 究竟這樣設計爲什麼? 的討論:

連城404:Spark是線程級並行模型。爲何需要一個worker爲一個app啓動多個executor呢?

樸動_zju:一個worker相應一個executorbackend是從mesos那一套遷移過來的,mesos下也是一個slave一個executorbackend。我理解這裏是可以實現起多個,但起多個貌似沒什麼優勢,而且添加了複雜度。

CrazyJvm@CodingCat 作了一個patch可以啓動多個,但是尚未被merge。 從Yarn的角度考慮的話,一個Worker可以相應多個executorbackend,正如一個nodemanager相應多個container。 @OopsOutOfMemory 

OopsOutOfMemory:回覆@連城404: 假設一個executor太大且裝的對象太多。會致使GC很是慢,多幾個Executor會下降full gc慢的問題。 see this post http://t.cn/RP1bVO4(今天 11:25)

連城404:回覆@OopsOutOfMemory:哦。這個考慮是有道理的。

一個workaround是單臺機器部署多個worker。worker相對來講比較便宜。

 

JerryLead:回覆@OopsOutOfMemory:看來都還在變化其中,standalone 和 YARN 仍是有很是多不一樣,咱們暫不下結論 (今天 11:35)

JerryLead:問題開始變得複雜了,是提升線程並行度仍是提升進程並行度?我想 Spark 仍是優先選擇前者,這樣 task 好管理。而且 broadcast,cache 的效率高些。後者有一些道理。但參數配置會變得更復雜,各有利弊吧 (今天 11:40)


未完待續。。

傳送門:@JerrLead  https://github.com/JerryLead/SparkInternals/blob/master/markdown/1-Overview.md


——EOF——

原創文章。轉載請註明來自:http://blog.csdn.net/oopsoom/article/details/38763985

相關文章
相關標籤/搜索