在一個application內部,不一樣線程提交的Job默認按照FIFO順序來執行,假設線程1先提交了一個job1,線程2後提交了一個job2,那麼默認狀況下,job2必須等待job1執行完畢後才能執行,若是job1是一個長做業,而job2是一個短做業,那麼這對於提交job2的那個線程的用戶來講很不友好:我這個job是一個短做業,怎麼執行了這麼長時間。html
使用spark的公平調度算法能夠在必定程度上解決這個問題,此時,job2沒必要等待job1徹底運行完畢以後就能夠得到集羣資源來執行,最終的效果的就是,job2可能會在job1以前運行完畢。這對於一個更強調對資源的公平競爭的多用戶場景下是很是有用的,每個用戶均可以得到等量的資源,固然你能夠爲每個用戶指定一個優先級/權重,優先級/權重越高,得到的資源越多,好比對於一個長做業,你能夠爲指定更高的權重,而對於短做業,指定一個相對較低的權重。node
沒有顯示配置fairScheduler.xml下的公平調度算法算法
假設線程1提交了一個action,這個action觸發了一個jobId爲1的job。同時,在提交這個action以前,設置了spark.scheduler.pool:app
SparkContext.setLocalProperty(「spark.scheduler.pool」,」pool_name_1」)ide
假設線程2提交了一個action,這個action觸發了一個jobId爲2的job。同時,在提交這個action以前,也設置了spark.scheduler.pool:ui
SparkContext.setLocalProperty(「spark.scheduler.pool」,」pool_name_1」)this
假設線程3提交了一個action,這個action觸發了一個JobId=3的job,可是這個線程並無設置spark.scheduler.pool屬性。spa
最後的spark 資源池邏輯上以下圖所示:線程
rootPool這個池子裏面有三個小池子,其名字分別爲:pool_name1,pool_name2,default;pool_name1這個池子存儲線程1提交的job,pool_name2存儲線程2提交的job,default池子存儲那些沒有顯示設置spark.scheduler.pool的線程提交的job,換句話說咱們將不一樣線程提交的job給隔離到不一樣的池子裏了。code
每個小池子都有如下三個能夠配置的屬性:weight,minshare,mode,他們的默認值以下:
weight=1
minshare=0
mode=FIFO
一個池子的weight值越大,其得到資源就越多,在上圖中,由於這三個池子的weight值相同,因此他們將得到等量的資源。
一個池子的minShare表示這個池子至少得到的core個數。
mode能夠是FIFO或者FAIR,若是爲FIFO,那麼池子裏jobid越大的job(等價的,先提交的job),將越先得到集羣資源;若是是FAIR,那麼將採用一種更加公平的機制來調度job,這個後面再說。
顯示配置fairScheduler.xml的公平調度算法
能夠發現,上面那種經過在線程裏設置spark.scheduler.pool的方式,所建立的池子的屬性採用的都是默認值,並且一旦建立好以後你就不能再修改他們。spark提供了另一種建立池子的方式,你能夠配置conf/fairScheduler.xml文件,假設其內容以下(官方提供的內容):
<?xml version="1.0"?> <allocations> <pool name="production"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations>
若是配置了fairScheduler.xml文件,而且其內容如上所示,那麼此時的spark 資源池的樣子大體以下:
這個資源池裏面一樣有三個小池子,其名字分別爲:production,test,default。其中production資源池的weight爲2,他將得到更多的資源(與default池子相比),因爲其minShare=3,因此他最低將得到3個core,其mode=FAIR,因此提交到這個池子裏的job將按照FAIR算法來調度。
事實上,經過這兩個圖已經可以在腦海裏對spark資源池產生一個大體的印象了,此時再去看spark 資源池的源碼就會很是容易。
在初次閱讀FIFO算法源碼以前:須要重點關注兩個屬性,priority和stageId,其中的priority就是jobid。先提交的job,其jobid越小,所以priority就越小。finalStage其stageId最大,其parent stage 的stageId較小。
對於公平調度算法,給定兩個池子a和b,誰優先得到資源?
1.若是a阻塞了可是b沒有阻塞,那麼先執行a
2.若是a沒有阻塞,b阻塞了,先執行b
3.若是a和b都阻塞了,那麼阻塞程度高(等待執行的task比例大) 的那個先執行
4.若是a和b都沒有阻塞,那麼資源少的那個先執行。
5.若是以上條件都不知足,那麼按照a和b的名字來排序。
仍是看一下這個算法的實現吧:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare: Int = 0 //若是s1阻塞,s2沒有阻塞,那麼就先執行s1 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false //若是s1沒有,s2阻塞了,就先執行s2 } else if (s1Needy && s2Needy) {//若是兩者都阻塞了,那就看誰阻塞程度大 compare = minShareRatio1.compareTo(minShareRatio2) } else {//都沒阻塞,那麼看誰的資源少。 compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name//實在不行了,按照池子的name排序吧。 } } }
源碼走讀
TaskSchedulerImpl在收到DAGScheduler提交的TaskSet時執行以下方法:
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //建立TaskSetManager val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } //將TaskSetManager添加到資源池,properties裏面存儲了咱們調用SparkContext.setLocalProperty時傳遞的poolName schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } //調用CoarseGrainedSchedulerBakckend的reviveOffers() backend.reviveOffers() }
這個方法主要作兩件事情:第一建立TaskSetManager而後將其添加到資源池,第二調用CoarseGrainedSchedulerBackend進行資源調度。
咱們重點講解資源池的構造和資源池的添加,所以重點關注schedulerBuilder。他是一個trait,主要有兩個實現:FIFOSchedulableBuilder和FairSchedulableBuilder。
/** * An interface to build Schedulable tree * buildPools: build the tree nodes(pools) * addTaskSetManager: build the leaf nodes(TaskSetManagers) */ private[spark] trait SchedulableBuilder { def rootPool: Pool //構建資源池,在建立SchedulerBuilder時,會調用buildPools方法來構建池子 def buildPools() //將資源池添加到rootPool中 def addTaskSetManager(manager: Schedulable, properties: Properties) }
這裏重點關注他的FairSchedulerBuilder。
addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME // var parentPool = rootPool.getSchedulableByName(poolName) //若是用戶設置了spark.scheduler.pool if (properties != null) { //默認用戶設置的spark.scheduler.pool的值 poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) parentPool = rootPool.getSchedulableByName(poolName) //若是沒有這個池子,就建立一個新的池子 if (parentPool == null) { // we will create a new pool that user has configured in app // instead of being defined in xml file //此時的mode,minshare,weight都採用默認值,所以能夠發現,在經過設置spark.scheduler.pool這種方式生成的池子 //採用的都是默認值 parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) //最後將新建立的池子添加到rootPool中。 rootPool.addSchedulable(parentPool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } parentPool.addSchedulable(manager) logInfo("Added task set " + manager.name + " tasks to pool " + poolName) }
從這裏能夠看出,經過在線程裏使用SparkContext.setLocalProperty來設置spark.scheduler.pool所生成的資源池,其weight,minShare,mode採用的都是默認值,這在某些場景可能不知足用戶要求,此時就須要顯示的配置fairScheduler.xml文件了。
若是用戶建立了fairScheduler.xml,那麼會調用buildPools讀取這個文件,來建立用戶配置的池子:
override def buildPools() { var is: Option[InputStream] = None try { is = Option { schedulerAllocFile.map { f => new FileInputStream(f) }.getOrElse { Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) } } //構建fairScheduler.xml中指定的池子 is.foreach { i => buildFairSchedulerPool(i) } } finally { is.foreach(_.close()) } // finally create "default" pool //構建默認池子,也就是default池子。 buildDefaultPool() }
private def buildFairSchedulerPool(is: InputStream) { val xml = XML.load(is) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text var schedulingMode = DEFAULT_SCHEDULING_MODE var minShare = DEFAULT_MINIMUM_SHARE var weight = DEFAULT_WEIGHT val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text if (xmlSchedulingMode != "") { try { schedulingMode = SchedulingMode.withName(xmlSchedulingMode) } catch { case e: NoSuchElementException => logWarning("Error xml schedulingMode, using default schedulingMode") } } val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text if (xmlMinShare != "") { minShare = xmlMinShare.toInt } val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text if (xmlWeight != "") { weight = xmlWeight.toInt } //建立用戶配置的池子 val pool = new Pool(poolName, schedulingMode, minShare, weight) rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } }
總結:
若是開啓了fari公平調度算法,而且在提交action的線程裏面設置了sparkContext.setLocalPropery("spark.scheduler.pool",poolname),那麼這個線程提交的全部job都被提交到poolName指定的資源池裏,若是poolName指定的資源池不存在,那麼將使用默認值來自動建立他。一種更加靈活的建立池子的方式是用戶顯示的配置fairScheduler.xml文件,你能夠顯示的設置池子的weight,minShare,mode值。
因爲本人接觸spark時間不長,若有錯誤或者任何意見能夠在留言或者發送郵件到franciswbs@163.com,讓咱們一塊兒交流。
做者:FrancisWang
郵箱:franciswbs@163.com出處:http://www.cnblogs.com/francisYoung/本文地址:http://www.cnblogs.com/francisYoung/p/5209798.html本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。