如下是字節跳動數據倉庫架構負責人-郭俊的分享主題沉澱:《字節跳動在Spark SQL上的核心優化實踐》。前端
上述信息存於 Catalog 內。在生產環境中,通常由 Hive Metastore 提供 Catalog 服務。Analyzer 會結合 Catalog 將 Unresolved Logical Plan 轉換爲 Resolved Logical Plan。算法
到這裏還不夠。不一樣的人寫出來的 SQL 不同,生成的 Resolved Logical Plan 也就不同,執行效率也不同。爲了保證不管用戶如何寫 SQL 均可以高效的執行,Spark SQL 須要對 Resolved Logical Plan 進行優化,這個優化由 Optimizer 完成。Optimizer 包含了一系列規則,對 Resolved Logical Plan 進行等價轉換,最終生成 Optimized Logical Plan。該 Optimized Logical Plan 不能保證是全局最優的,但至少是接近最優的。緩存
因爲同一種邏輯算子能夠有多種物理實現。如 Join 有多種實現,ShuffledHashJoin、BroadcastHashJoin、BroadcastNestedLoopJoin、SortMergeJoin 等。所以 Optimized Logical Plan 可被 Query Planner 轉換爲多個 Physical Plan。如何選擇最優的 Physical Plan 成爲一件很是影響最終執行性能的事情。一種比較好的方式是,構建一個 Cost Model,並對全部候選的 Physical Plan 應用該 Model 並挑選 Cost 最小的 Physical Plan 做爲最終的 Selected Physical Plan。性能優化
後面介紹字節跳動在 Spark SQL 上作的一些優化,主要圍繞這一節介紹的邏輯計劃優化與物理計劃優化展開。架構
下圖展現了 SortMergeJoin 的基本原理。用虛線框表明的 Table 1 和 Table 2 是兩張須要按某字段進行 Join 的表。虛線框內的 partition 0 到 partition m 是該錶轉換成 RDD 後的 Partition,而非表的分區。假設 Table 1 與 Table 2 轉換爲 RDD 後分別包含 m 和 k 個 Partition。爲了進行 Join,須要經過 Shuffle 保證相同 Join Key 的數據在同一個 Partition 內且 Partition 內按 Key 排序,同時保證 Table 1 與 Table 2 通過 Shuffle 後的 RDD 的 Partition 數相同。app
對於大數據的場景來說,數據通常是一次寫入屢次查詢。若是常常對兩張表按相同或相似的方式進行 Join,每次都須要付出 Shuffle 的代價。與其這樣,不如讓數據在寫的時候,就讓數據按照利於 Join 的方式分佈,從而使得 Join 時無需進行 Shuffle。以下圖所示,Table 1 與 Table 2 內的數據按照相同的 Key 進行分桶且桶數都爲 n,同時桶內按該 Key 排序。對這兩張表進行 Join 時,能夠避免 Shuffle,直接啓動 n 個 Task 進行 Join。運維
改進一:支持與 Hive 兼容異步
爲了解決這個問題,咱們讓 Spark SQL 支持 Hive 兼容模式,從而保證 Spark SQL 寫入的 Bucket 表與 Hive 寫入的 Bucket 表效果一致,而且這種表能夠被 Hive 和 Spark SQL 當成 Bucket 表進行 Bucket Join 而不須要 Shuffle。經過這種方式保證 Hive 向 Spark SQL 的透明遷移。oop
改進二:支持倍數關係Bucket Join性能
第一種方式,Task 個數與小表 Bucket 個數相同。以下圖所示,Table A 包含 3 個 Bucket,Table B 包含 6 個 Bucket。此時 Table B 的 bucket 0 與 bucket 3 的數據合集應該與 Table A 的 bucket 0 進行 Join。這種狀況下,能夠啓動 3 個 Task。其中 Task 0 對 Table A 的 bucket 0 與 Table B 的 bucket 0 + bucket 3 進行 Join。在這裏,須要對 Table B 的 bucket 0 與 bucket 3 的數據再作一次 merge sort 從而保證合集有序。
若是 Table A 與 Table B 的 Bucket 個數相差不大,可使用上述方式。若是 Table B 的 Bucket 個數是 Bucket A Bucket 個數的 10 倍,那上述方式雖然避免了 Shuffle,但可能由於並行度不夠反而比包含 Shuffle 的 SortMergeJoin 速度慢。此時可使用另一種方式,即 Task 個數與大表 Bucket 個數相等,以下圖所示。
改進三:支持BucketJoin 降級
同時,因爲數據量上漲快,平均 Bucket 大小也快速增加。這會形成單 Task 須要處理的數據量過大進而引發使用 Bucket 後的效果可能不如直接使用基於 Shuffle 的 Join。
改進四:支持超集
以下圖所示,Table X 與 Table Y,都按字段 A 分 Bucket。而查詢須要對 Table X 與 Table Y 進行 Join,且 Join Key Set 爲 A 與 B。此時,因爲 A 相等的數據,在兩表中的 Bucket ID 相同,那 A 與 B 各自相等的數據在兩表中的 Bucket ID 確定也相同,因此數據分佈是知足 Join 要求的,不須要 Shuffle。同時,Bucket Join 還須要保證兩表按 Join Key Set 即 A 和 B 排序,此時只須要對 Table X 與 Table Y 進行分區內排序便可。因爲兩邊已經按字段 A 排序了,此時再按 A 與 B 排序,代價相對較低。
物化列
Spark SQL 處理嵌套類型數據時,存在如下問題:
不支持 Filter 下推:目前(2019年10月26日)的 Spark 不支持嵌套類型字段上的 Filter 的下推
重複計算:JSON 字段,在 Spark SQL 中以 String 類型存在,嚴格來講不算嵌套數據類型。不過實踐中也經常使用於保存不固定的多個字段,在查詢時經過 JSON Path 抽取目標子字段,而大型 JSON 字符串的字段抽取很是消耗 CPU。對於熱點表,頻繁重複抽取相同子字段很是浪費資源。
對於這個問題,作數倉的同窗也想了一些解決方案。以下圖所示,在名爲 base_table 的表以外建立了一張名爲 sub_table 的表,而且將高頻使用的子字段 people.age 設置爲一個額外的 Integer 類型的字段。下游再也不經過 base_table 查詢 people.age,而是使用 sub_table 上的 age 字段代替。經過這種方式,將嵌套類型字段上的查詢轉爲了 Primitive 類型字段的查詢,同時解決了上述問題。
下圖展現了在某張核心表上使用物化列的收益:
在 OLAP 領域,常常會對相同表的某些固定字段進行 Group By 和 Aggregate / Join 等耗時操做,形成大量重複性計算,浪費資源,且影響查詢性能,不利於提高用戶體驗。
如上圖所示,查詢歷史顯示大量查詢根據 user 進行 group by,而後對 num 進行 sum 或 count 計算。此時可建立一張物化視圖,且對 user 進行 gorup by,對 num 進行 avg(avg 會自動轉換爲 count 和 sum)。用戶對原始表進行 select user, sum(num) 查詢時,Spark SQL 自動將查詢重寫爲對物化視圖的 select user, sum_num 查詢。
Spark SQL 引擎上的其它優化
如上圖所示,咱們將 Shuffle 上游 Stage 稱爲 Mapper Stage,其中的 Task 稱爲 Mapper。Shuffle 下游 Stage 稱爲 Reducer Stage,其中的 Task 稱爲 Reducer。
每一個 Mapper 會將本身的數據分爲最多 N 個部分,N 爲 Reducer 個數。每一個 Reducer 須要去最多 M (Mapper 個數)個 Mapper 獲取屬於本身的那部分數據。
基於HDFS的Shuffle穩定性提高
如上圖所示,機器的 CPU 使用率接近 100%,使得 Mapper 側的 Node Manager 內的 Spark External Shuffle Service 沒法及時提供 Shuffle 服務。
不管是何種緣由,問題的癥結都是 Mapper 側的 Shuffle Write 數據只保存在本地,一旦該節點出現問題,會形成該節點上全部 Shuffle Write 數據沒法被 Reducer 讀取。解決這個問題的一個通用方法是,經過多副本保證可用性。
最初始的一個簡單方案是,Mapper 側最終數據文件與索引文件不寫在本地磁盤,而是直接寫到 HDFS。Reducer 再也不經過 Mapper 側的 External Shuffle Service 讀取 Shuffle 數據,而是直接從 HDFS 上獲取數據,以下圖所示。
快速實現這個方案後,咱們作了幾組簡單的測試。結果代表:
緣由在於,總共 10000 Reducer,須要從 10000 個 Mapper 處讀取數據文件和索引文件,總共須要讀取 HDFS 10000 * 1000 * 2 = 2 億次。
若是隻是 Name Node 的單點性能問題,還能夠經過一些簡單的方法解決。例如在 Spark Driver 側保存全部 Mapper 的 Block Location,而後 Driver 將該信息廣播至全部 Executor,每一個 Reducer 能夠直接從 Executor 處獲取 Block Location,而後無須鏈接 Name Node,而是直接從 Data Node 讀取數據。但鑑於 Data Node 的線程模型,這種方案會對 Data Node 形成較大沖擊。
Mapper 的 Shuffle 輸出數據仍然按原方案寫本地磁盤,寫完後上傳到 HDFS。Reducer 仍然按原始方案經過 Mapper 側的 External Shuffle Service 讀取 Shuffle 數據。若是失敗了,則從 HDFS 讀取。這種方案極大減小了對 HDFS 的訪問頻率。
該方案上線近一年:
小時級做業性能提高 12%。
該方案旨在提高 Spark Shuffle 穩定性從而提高做業穩定性,但最終沒有使用方差等指標來衡量穩定性的提高。緣由在於天天集羣負載不同,總體方差較大。Shuffle 穩定性提高後,Stage Retry 大幅減小,總體做業執行時間減小,也即性能提高。最終經過對比使用該方案先後的總的做業執行時間來對比性能的提高,用於衡量該方案的效果。
如上文所分析,Shuffle 性能問題的緣由在於,Shuffle Write 由 Mapper 完成,而後 Reducer 須要從全部 Mapper 處讀取數據。這種模型,咱們稱之爲以 Mapper 爲中心的 Shuffle。它的問題在於:
回答:歷史數據太多,不適合修改歷史數據。
回答:通常而言,用戶修改數據都是以 Partition 爲單位。因此咱們在 Partition Parameter 上保存了物化列相關信息。若是用戶的查詢同時包含了新 Partition 與歷史 Partition,咱們會在新 Partition 上針對物化列進行 SQL Rewrite,歷史 Partition 不 Rewrite,而後將新老 Partition 進行 Union,從而在保證數據正確性的前提下儘量充分利用物化列的優點。
回答:目前咱們主要是經過一些審計信息輔助人工分析。同時咱們也正在作物化列與物化視圖的推薦服務,最終作到智能建設物化列與物化視圖。
回答:這個想法挺好,咱們以前也考慮過,但基於幾點考慮,最終沒有這樣作。第一,單 Mapper 的 Shuffle 輸出數據量通常很小,上傳到 HDFS 耗時在 2 秒之內,這個時間開銷能夠忽略;第二,咱們普遍使用 External Shuffle Service 和 Dynamic Allocation,Mapper 執行完成後可能 Executor 就回收了,若是要異步上傳,就必須依賴其它組件,這會提高複雜度,ROI 較低。
上海沙龍回顧 | Apache Kylin 原理介紹與新架構分享(Kylin On Parquet)
字節跳動技術沙龍邀請來自字節跳動及業內互聯網公司的技術專家,分享熱門技術話題與一線實踐經驗,內容覆蓋架構、大數據、前端、測試、運維、算法、系統等技術領域。
字節跳動技術沙龍旨在爲技術領域人才提供一個開放、自由的交流學習平臺,幫助技術人學習成長,不斷進階。