Spark做業調度

概述

研究Spark做業調度,是爲了合理使用集羣的資源。更具體一點,是看看是否提供了能夠個性化配置的點,而後根據應用的具體狀況制定配置或者使用方案。node

本文參考官網做業調度文檔apache

spark的做業調度分爲兩個場景:跨應用的調度和應用內部的調度,下面分別介紹。緩存

聲明:文中配圖是本身的理解,並不敢保證其準確性。網絡

跨應用調度

跨應用的調度是由底層的集羣管理器負責的,有兩種資源分配策略。spa

一種是靜態資源分隔,即一個應用一開始就申請全部的資源,並在程序運行期間使用持有這些資源。線程

一種是動態資源分配,應用根據本身的負載狀況動態請求或釋放資源。這種策略默認是不開啓的。scala

靜態資源分隔

全部的集羣管理器都支持靜態資源分隔,只是具體的配置策略不一樣:code

Standalone mode

提交到Standalone mode集羣的應用會以FIFO的順序運行,每個正在運行的應用都會嘗試佔用全部的可用資源。使用下面的配置項能夠限制每一個應用申請的資源:xml

spark.cores.max

應用能夠申請的最大數量的CPU核的數量,若是沒有設置,取spark.deploy.defaultCores的值。

spark.executor.memory

分配給每一個executor進程的內存資源。

Mesos

爲了使用靜態資源隔離,須要設置spark.mesos.coarse爲true,這稱爲粗粒度的Mesos模式。

另外,spark.cores.maxspark.executor.memory在Mesos模式下一樣有效。

YARN

--num-executors

在使用spark-submit提交做業時,可使用--num-executors選項請求指定的executor個數。

在程序內部,能夠經過設置spark.executor.instances屬性達到一樣的目的。

--executor-memory

在使用spark-submit提交做業時,可使用--executor-memory選項設置每一個executor申請的內存。

在程序內部,能夠經過設置spark.executor.memory屬性達到一樣的目的。

--executor-cores

在使用spark-submit提交做業時,可使用--executor-cores選項設置每一個executor申請的CPU核。

在程序內部,能夠經過設置spark.executor.cores屬性達到一樣的目的。

動態資源分配

spark的運行模型是基於executor的,executor是資源的實際持有者。因此動態資源分配,是經過動態的申請executor和釋放executor來實現的。

動態資源分配涉及到兩個方面,如何在須要的時候動態申請資源,以及如何在空閒的時候動態釋放資源。

動態請求策略:若是一個應用有tasks在等待,超過必定的時間(spark.dynamicAllocation.schedulerBacklogTimeout秒)就會申請1個executor。此後每隔必定的時間(spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒)就檢測應用是否有tasks在等待,有就繼續申請executor。

動態請求資源的數量是指數級的,第一次申請1個,第二次申請2個,接着是4, 8 ...這種考慮是爲了在謹慎申請資源的同時,又能夠在容許的時間範圍內得到真正須要的資源量。

動態釋放資源:是經過檢查應用佔據的executor是否超過了指定的時間(spark.dynamicAllocation.executorIdleTimeout秒)來決定的,超過了就釋放。

釋放資源的條件和請求資源的條件是互斥的,即若是一個應用有tasks在排隊,就不該該會有空閒的executor。

how to do

爲了使用動態資源分配,須要作兩件事:

  1. 設置spark.dynamicAllocation.enabled值爲true
  2. 在每個工做節點啓動external shuffle service,並設置spark.shuffle.service.enabled爲true

external shuffle service的做用在後面會介紹,不一樣集羣模式下啓動external shuffle service的方式不一樣:

  • Standalone模式,不須要額外的工做來啓動external shuffle service,只須要設置spark.shuffle.service.enabled爲true便可。
  • Mesos粗粒度模式,在每個slave nodes運行腳本$SPARK_HOME/sbin/start-mesos-shuffle-service.sh來啓動external shuffle service
  • YARN模式,參考Configuring the External Shuffle Service On Yarn

動態移除executor面對的問題

動態釋放資源須要額外的支持,由於executor可能會產生中間結果並輸出到本地,在須要的時候須要經過這個executor獲取它的中間結果。冒然移除executor會丟失它計算的中間結果,致使在真正須要的時候又要從新計算。

好比在map階段executor輸出map結果,在shuffle階段這些map結果又須要經過executor讀出來傳送到負責reduce的executor。

spark經過external shuffle service來解決這個問題。external shuffle service是指在每個node都運行的一個長期進程,這個進程獨立於應用和executor,負責提供executor的輸出數據的獲取服務。原來executor之間相互請求來獲取對方的輸出結果,變成了統一從external shuffle service獲取結果。

即便executor已經被移除了,它所輸出的數據依然能夠經過external shuffle service來獲取。

另外,executor還可能會把中間結果緩存到內存,目前的策略是不移除此類的executor。將來可能採起將緩存持久化的方式,進而釋放executor。

應用內部調度

一個spark應用能夠支持多個不一樣線程的job同時提交,這常見於spark應用提供網絡服務的場景。

spark默認的調度策略是FIFO,若是隊列頭部的job比較大,佔用了集羣的全部資源,後面的小任務將遲遲得不到運行的機會。

另外,spark還支持配置FAIR調度,spark循環調度每一個job的task。這樣即便有大job在運行,剛提交的小job也能夠及時得到資源,而不須要等到大job結束。

經過設置屬性spark.scheduler.mode來啓用公平調度:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平調度池

spark支持公平調度池的概念,每一個線程能夠指定將jobs提交到哪一個池子,最細粒度的場景下是每一個線程對應一個池,也能夠多個線程使用同一個池。

每一個線程默認使用的池是default,也能夠經過設置參數來明確指定池。

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

若是想重置當前線程綁定的池子,調用sc.setLocalProperty("spark.scheduler.pool", null)

能夠經過配置文件將資源按照必定的比重分配到池,配置文件的模板:conf/fairscheduler.xml.template

經過conf.set("spark.scheduler.allocation.file", "/path/to/file")指定配置文件。

每一個池可支持的參數有三個:

  • schedulingMode:FIFO 或 FAIR,FIFO是默認的策略。
  • weight:每一個池子分配資源的權重,默認狀況下全部的權重爲1。
  • minShare:最小資源,CPU核的數量,默認爲0。在進行資源分配時,老是最早知足全部池的minShare,再根據weight分配剩下的資源。

配置文件示例:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

沒有出如今配置文件中的池,全部參數取默認值(schedulingMode=FIFO,weight=1,minShare=0)。

概念澄清

executor到底指什麼?和容器、JVM的關係是怎樣的?

executor是負責必定職責的程序組件,能夠在已有的JVM中運行(好比local mode),也能夠在新的JVM中運行。使用YARN時,executor是在YARN容器中運行的。

spark的job - stage - task的劃分是怎麼樣的?

spark的job能夠劃分爲多個stage,這些stage構成了DAG。每個stage又能夠劃分爲多個tasks。stage的劃分是根據shuffle map task來的,這一類的task至關於MapReduce中shuffle的map端,負責在本地RDD分區進行計算,並將結果輸出到新的分區,供後續的使用。在劃分stage時,shuffle map任務做爲階段的結束的邊界。

Mesos的粗粒度和細粒度

Mesos能夠啓用CPU核的共享,即同一個節點executor在不使用核的狀況下可讓給另外一個executor來使用。

不啓用CPU核共享稱爲粗粒度,啓用則稱爲細粒度,相關的配置項爲spark.mesos.coarse,值爲true表示粗粒度。

相關文章
相關標籤/搜索