Spark SQL小文件問題在OPPO的解決方案

本文來自OPPO互聯網基礎技術團隊,轉載請註名做者。同時歡迎關注咱們的公衆號:OPPO_tech,與你分享OPPO前沿互聯網技術及活動。sql

Spark SQL小文件是指文件大小顯著小於hdfs block塊大小的的文件。過於繁多的小文件會給HDFS帶來很嚴重的性能瓶頸,對任務的穩定和集羣的維護會帶來極大的挑戰。bash

通常來講,經過Hive調度的MR任務均可以簡單設置以下幾個小文件合併的參數來解決任務產生的小文件問題:markdown

set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=xxxx;
set hive.merge.smallfiles.avgsize=xxx;
複製代碼

然而在咱們將離線調度任務逐步從Hive遷移到Spark的過程當中,因爲Spark自己並不支持小文件合併功能,小文件問題日益突出,對集羣穩定性形成很大影響,一度阻礙了咱們的遷移工做。性能

爲了解決小文件問題,咱們經歷了從開始的不斷調整參數到後期的代碼開發等不一樣階段,這裏給你們作一個簡單的分享。spa

1. Spark爲何會產生小文件

Spark生成的文件數量直接取決於RDD裏partition的數量和表分區數量。注意這裏的兩個分區概念並不相同,RDD的分區與任務並行度相關,而表分區則是Hive的分區數目。生成的文件數目通常是RDD分區數和表分區的乘積。所以,當任務並行度太高或者分區數目很大時,很容易產生不少的小文件。code

所以,若是須要從參數調整來減小生成的文件數目,就只能經過減小最後一個階段RDD的分區數來達到了(減小分區數目限制於歷史數據和上下游關係,難以修改)orm

2. 基於社區版本的參數進行調整的方案

2.1 不含有Shuffle算子的簡單靜態分區SQL

這樣的SQL比較簡單,主要是filter上游表一部分數據寫入到下游表,或者是兩張表簡單UNION起來的任務,這種任務的分區數目主要是由讀取文件時Partition數目決定的。排序

  • 由於從Spark 2.4以來,對Hive orc表和parquet支持已經很不錯了,爲了加快運行速率,咱們開啓了將Hive orc/parquet表自動轉爲DataSource的參數。對於這種DataSource表的類型,partition數目主要是由以下三個參數控制其關係。
spark.sql.files.maxPartitionBytes;
spark.sql.files.opencostinbytes;
spark.default.parallelism;
複製代碼

其關係以下圖所示,所以能夠經過調整這三個參數來輸入數據的分片進行調整:開發

  • 而非DataSource表,使用CombineInputFormat來讀取數據,所以主要是經過MR參數來進行分片調整:mapreduce.input.fileinputformat.split.minsize

雖然咱們能夠經過調整輸入數據的分片來對最終文件數量進行調整,可是這樣的調整是不穩定的,上游數據大小發生一些輕微的變化,就可能帶來參數的從新適配。get

爲了簡單粗暴的解決這個問題,咱們對這樣的SQL加了repartition的hint,引入了新的shuffle,保證文件數量是一個固定值。

2.2 帶有Shuffle算子的靜態分區任務

在ISSUE SPARK-9858中,引入了一個新的參數: spark.sql.adaptive.shuffle.targetPostShuffleInputSize,

後期基於spark adaptive又對這個參數作了進一步加強,能夠動態的調整partition數量,儘量保證每一個task處理targetPostShuffleInputSize大小的數據,所以這個參數咱們也能夠用來在必定程度上控制生成的文件數量。

2.3 動態分區任務

動態分區任務由於存在着分區這一變量,單純調整rdd這邊的partition數目很難把控總體的文件數量。

在hive裏,咱們能夠經過設置hive.optimize.sort.dynamic.partition來緩解動態分區產生文件過多致使任務執行時task節點常常oom的情況。這樣的參數會引入新的的shuffle,來對數據進行重排序,將相同的partition分給同一個task處理,從而避免了一個task同時持有多個文件句柄。

所以,咱們能夠藉助這樣的思想,使用distribute by語句來修改sql,從而控制文件數量。通常而言,假設咱們想對於每一個分區生成不超過N個文件,則能夠在SQL末尾增長DISTRIBUTE BY [動態分區列],ceil(rand() * N)。

3. 自研可合併文件的commitProtocol方案

綜上種種,每一個方法都存在必定的弊端,衆多規則也在實際使用過程當中對業務方形成很大困擾。

所以咱們產生了想在spark這邊實現和hive相似的小文件合併機制。在幾個可能的方案選型中,咱們最終選擇了:重寫spark.sql.sources.commitProtocolClass方法。

一方面,該方案對Spark代碼無侵入,便於Spark源碼的維護,另外一方面,該方案對業務方使用友好,能夠動態經過set命令設置,若是出現問題回滾也十分方便。業務方在使用過程當中,只須要簡單設置: spark.sql.sources.commitProtocolClass,便可控制是否開啓小文件合併。

在開啓小文件合併參數後,咱們會在commit階段拿到生成的全部文件,引入兩個新的job來對這些文件進行處理。首先咱們在第一個job獲取到全部大小小於spark.compact.smallfile.size的文件,在查找完成後按照spark.compact.size參數值對組合文件,並在第二個job中對這些文件進行合併。

相關文章
相關標籤/搜索