Spark 資源池簡介

   在一個application內部,不一樣線程提交的Job默認按照FIFO順序來執行,假設線程1先提交了一個job1,線程2後提交了一個job2,那麼默認狀況下,job2必須等待job1執行完畢後才能執行,若是job1是一個長做業,而job2是一個短做業,那麼這對於提交job2的那個線程的用戶來講很不友好:我這個job是一個短做業,怎麼執行了這麼長時間。html

   使用spark的公平調度算法能夠在必定程度上解決這個問題,此時,job2沒必要等待job1徹底運行完畢以後就能夠得到集羣資源來執行,最終的效果的就是,job2可能會在job1以前運行完畢。這對於一個更強調對資源的公平競爭的多用戶場景下是很是有用的,每個用戶均可以得到等量的資源,固然你能夠爲每個用戶指定一個優先級/權重,優先級/權重越高,得到的資源越多,好比對於一個長做業,你能夠爲指定更高的權重,而對於短做業,指定一個相對較低的權重。node

 

沒有顯示配置fairScheduler.xml下的公平調度算法算法

假設線程1提交了一個action,這個action觸發了一個jobId爲1的job。同時,在提交這個action以前,設置了spark.scheduler.pool:app

SparkContext.setLocalProperty(「spark.scheduler.pool」,」pool_name_1」)ide

假設線程2提交了一個action,這個action觸發了一個jobId爲2的job。同時,在提交這個action以前,也設置了spark.scheduler.pool:ui

SparkContext.setLocalProperty(「spark.scheduler.pool」,」pool_name_1」)this

假設線程3提交了一個action,這個action觸發了一個JobId=3的job,可是這個線程並無設置spark.scheduler.pool屬性。spa

 

  最後的spark 資源池邏輯上以下圖所示:線程

rootPool這個池子裏面有三個小池子,其名字分別爲:pool_name1,pool_name2,default;pool_name1這個池子存儲線程1提交的job,pool_name2存儲線程2提交的job,default池子存儲那些沒有顯示設置spark.scheduler.pool的線程提交的job,換句話說咱們將不一樣線程提交的job給隔離到不一樣的池子裏了。code

每個小池子都有如下三個能夠配置的屬性:weight,minshare,mode,他們的默認值以下:

weight=1

minshare=0

mode=FIFO

一個池子的weight值越大,其得到資源就越多,在上圖中,由於這三個池子的weight值相同,因此他們將得到等量的資源。

一個池子的minShare表示這個池子至少得到的core個數。

mode能夠是FIFO或者FAIR,若是爲FIFO,那麼池子裏jobid越大的job(等價的,先提交的job),將越先得到集羣資源;若是是FAIR,那麼將採用一種更加公平的機制來調度job,這個後面再說。

 

顯示配置fairScheduler.xml的公平調度算法

能夠發現,上面那種經過在線程裏設置spark.scheduler.pool的方式,所建立的池子的屬性採用的都是默認值,並且一旦建立好以後你就不能再修改他們。spark提供了另一種建立池子的方式,你能夠配置conf/fairScheduler.xml文件,假設其內容以下(官方提供的內容):

 <?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

 

若是配置了fairScheduler.xml文件,而且其內容如上所示,那麼此時的spark 資源池的樣子大體以下:

 

 

這個資源池裏面一樣有三個小池子,其名字分別爲:production,test,default。其中production資源池的weight爲2,他將得到更多的資源(與default池子相比),因爲其minShare=3,因此他最低將得到3個core,其mode=FAIR,因此提交到這個池子裏的job將按照FAIR算法來調度。

 

事實上,經過這兩個圖已經可以在腦海裏對spark資源池產生一個大體的印象了,此時再去看spark 資源池的源碼就會很是容易。

 

在初次閱讀FIFO算法源碼以前:須要重點關注兩個屬性,priority和stageId,其中的priority就是jobid。先提交的job,其jobid越小,所以priority就越小。finalStage其stageId最大,其parent stage 的stageId較小。

 

 

對於公平調度算法,給定兩個池子a和b,誰優先得到資源?

1.若是a阻塞了可是b沒有阻塞,那麼先執行a

2.若是a沒有阻塞,b阻塞了,先執行b

3.若是a和b都阻塞了,那麼阻塞程度高(等待執行的task比例大) 的那個先執行

4.若是a和b都沒有阻塞,那麼資源少的那個先執行。

5.若是以上條件都不知足,那麼按照a和b的名字來排序。

 

仍是看一下這個算法的實現吧:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0
    
    //若是s1阻塞,s2沒有阻塞,那麼就先執行s1
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false //若是s1沒有,s2阻塞了,就先執行s2
    } else if (s1Needy && s2Needy) {//若是兩者都阻塞了,那就看誰阻塞程度大
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {//都沒阻塞,那麼看誰的資源少。
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name//實在不行了,按照池子的name排序吧。
    }
  }
}

 

源碼走讀

TaskSchedulerImpl在收到DAGScheduler提交的TaskSet時執行以下方法:

 

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //建立TaskSetManager
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      
      //將TaskSetManager添加到資源池,properties裏面存儲了咱們調用SparkContext.setLocalProperty時傳遞的poolName
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //調用CoarseGrainedSchedulerBakckend的reviveOffers()
    backend.reviveOffers()
  }

 

 

 

這個方法主要作兩件事情:第一建立TaskSetManager而後將其添加到資源池,第二調用CoarseGrainedSchedulerBackend進行資源調度。

咱們重點講解資源池的構造和資源池的添加,所以重點關注schedulerBuilder。他是一個trait,主要有兩個實現:FIFOSchedulableBuilder和FairSchedulableBuilder。

/**
 * An interface to build Schedulable tree
 * buildPools: build the tree nodes(pools)
 * addTaskSetManager: build the leaf nodes(TaskSetManagers)
 */
private[spark] trait SchedulableBuilder {
  def rootPool: Pool

  //構建資源池,在建立SchedulerBuilder時,會調用buildPools方法來構建池子
  def buildPools()

  //將資源池添加到rootPool中
  def addTaskSetManager(manager: Schedulable, properties: Properties)
}

 

這裏重點關注他的FairSchedulerBuilder。

addTaskSetManager

 override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    var poolName = DEFAULT_POOL_NAME
    //
    var parentPool = rootPool.getSchedulableByName(poolName)
    
    //若是用戶設置了spark.scheduler.pool
    if (properties != null) {
      //默認用戶設置的spark.scheduler.pool的值
      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
      parentPool = rootPool.getSchedulableByName(poolName)
      
      //若是沒有這個池子,就建立一個新的池子
      if (parentPool == null) {
        // we will create a new pool that user has configured in app
        // instead of being defined in xml file
        //此時的mode,minshare,weight都採用默認值,所以能夠發現,在經過設置spark.scheduler.pool這種方式生成的池子
        //採用的都是默認值
        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
        //最後將新建立的池子添加到rootPool中。
        rootPool.addSchedulable(parentPool)
        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
      }
    }
    parentPool.addSchedulable(manager)
    logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
  }

 

從這裏能夠看出,經過在線程裏使用SparkContext.setLocalProperty來設置spark.scheduler.pool所生成的資源池,其weight,minShare,mode採用的都是默認值,這在某些場景可能不知足用戶要求,此時就須要顯示的配置fairScheduler.xml文件了。

若是用戶建立了fairScheduler.xml,那麼會調用buildPools讀取這個文件,來建立用戶配置的池子:

override def buildPools() {
    var is: Option[InputStream] = None
    try {
      is = Option {
        schedulerAllocFile.map { f =>
          new FileInputStream(f)
        }.getOrElse {
          Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
        }
      }

      //構建fairScheduler.xml中指定的池子
      is.foreach { i => buildFairSchedulerPool(i) }
    } finally {
      is.foreach(_.close())
    }

    // finally create "default" pool
  //構建默認池子,也就是default池子。
    buildDefaultPool()
  }

 

 private def buildFairSchedulerPool(is: InputStream) {
    val xml = XML.load(is)
    for (poolNode <- (xml \\ POOLS_PROPERTY)) {

      val poolName = (poolNode \ POOL_NAME_PROPERTY).text
      var schedulingMode = DEFAULT_SCHEDULING_MODE
      var minShare = DEFAULT_MINIMUM_SHARE
      var weight = DEFAULT_WEIGHT

      val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
      if (xmlSchedulingMode != "") {
        try {
          schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
        } catch {
          case e: NoSuchElementException =>
            logWarning("Error xml schedulingMode, using default schedulingMode")
        }
      }

      val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
      if (xmlMinShare != "") {
        minShare = xmlMinShare.toInt
      }

      val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
      if (xmlWeight != "") {
        weight = xmlWeight.toInt
      }

      //建立用戶配置的池子
      val pool = new Pool(poolName, schedulingMode, minShare, weight)
      rootPool.addSchedulable(pool)
      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
        poolName, schedulingMode, minShare, weight))
    }
  }

 

總結:

若是開啓了fari公平調度算法,而且在提交action的線程裏面設置了sparkContext.setLocalPropery("spark.scheduler.pool",poolname),那麼這個線程提交的全部job都被提交到poolName指定的資源池裏,若是poolName指定的資源池不存在,那麼將使用默認值來自動建立他。一種更加靈活的建立池子的方式是用戶顯示的配置fairScheduler.xml文件,你能夠顯示的設置池子的weight,minShare,mode值。

 

 

因爲本人接觸spark時間不長,若有錯誤或者任何意見能夠在留言或者發送郵件到franciswbs@163.com,讓咱們一塊兒交流。

 

做者:FrancisWang 

郵箱:franciswbs@163.com出處:http://www.cnblogs.com/francisYoung/本文地址:http://www.cnblogs.com/francisYoung/p/5209798.html本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。

相關文章
相關標籤/搜索