內容來源:本文內容由阿里大數據計算服務(MaxCompute)團隊投稿提供。IT 大咖說經受權發佈,轉載請標明出處。sql
閱讀字數:5502 | 14分鐘閱讀安全
目前hash clustering table已經在阿里巴巴內部集羣生產環境正式發佈,而且已經有螞蟻、安所有、菜鳥等多個BU參與了試用。從螞蟻的反饋來看,改造以後的任務收效很是明顯,運行時間縮短40%到80%,節省計算資源23%到67%。bash
對於增量更新的場景,能夠利用 MaxCompute2.0的新特性,對語句作簡單改造,從而大幅提高性能,節約集羣資源。併發
在數據開發的過程當中,每每會進行分層的設計,在ODS層中,一種很是常見的場景是使用一個增量表delta對一個存量表snapshot進行更新。例如snapshot表存儲全部的會員信息,而增量表中包括新增會員信息和原有會員信息屬性的一些修改;或者snapshot表存儲最近一個月的訂單信息,delta表存儲了新增訂單以及物流的更新等等。性能
對於這種任務,每每有如下幾個特徵:
大數據
snapshot表存儲量巨大,delta表相對較小優化
snapshot表和delta表擁有一致的schemaspa
snapshot和delta表中存在主鍵key,且key可能有重合(不然能夠經過簡單的union all來完成)設計
上一個週期的snapshot + 當前週期的delta => 當前週期的snapshotcode
爲了完成上述的功能,對應的sql邏輯通常使用full outer join,簡單起見,咱們的snapshot和delta表只有兩列
(key string, value string)複製代碼
其中key爲主鍵
INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT
CASE WHEN d.key IS NULL THEN s.key ELSE d.key END,
CASE WHEN d.key IS NULL THEN s.value ELSE d.value END
FROM
(SELECT * FROM snapshot WHERE ds='20170101') s
FULL OUTER JOIN
(SELECT * FROM delta WHERE ds = '20170102')d
ON s.key = d.key;複製代碼
這個語句表示,對於delta表存在的數據,使用delta表的值,對於delta表不存在的數據,使用snapshot的值。
可是在實際執行的過程當中,雖然只是進行簡單的join操做,可是因爲存量表可能很是大(幾T到幾十T的規模),這種任務每每耗時很是長,有些任務甚至須要耗費一天的時間才能產出。這種任務是否存在優化的空間呢?咱們能夠分析一個線上實際的大表full outer join的執行計劃。
能夠看到M1是snapshot表,須要將近20000的併發,M2是delta表,只有9個併發,而爲了進行join的操做,兩邊會各自進行shuffle,在J3階段進行sort-merge-join的計算。實際執行過程當中,M2只須要幾分鐘,M1須要十幾分鍾,而在J3階段則每每須要一兩個小時,由於J3只有3000個併發,可是卻讀取了上游將近20000個併發讀取的數據,至關於併發減少到原來的15%,處理的數據量倒是同樣的,固然耗時會長。另外,從M1到J3這個路徑的shuffle中間存在大表的一次讀寫+兩次排序,並且在數據量較大的狀況下,還有可能會發生數據spill,使得運行性能更差。
在這種狀況下,爲了縮短執行時間,一般能夠調大join階段的instance數目,增長join階段的內存減小spill等,可是instance的數目不能無限增加,不然會因爲shuffle規模太大形成集羣壓力過大,另外內存的資源也是有限的,因此調整參數也只是犧牲資源換取時間,治標不治本。
爲了對這個場景進行完全的優化,咱們但願能徹底消除掉大表的shuffle階段,將M1和J3合二爲一,這樣大表數據只須要讀寫一次,並且免去了中間排序的過程,執行時間能夠縮短一半甚至更多。有調優經驗的同窗可能已經想到了mapjoin,可是這裏的delta表每每數據較多沒法當作mapjoin的小表,另外mapjoin沒法支持full outer join,這兩個限制都沒法繞過,因此這個方案只能被pass了。那麼這個shuffle的階段應該如何省去呢?這裏就要引入咱們今天介紹的功能,hash clustering table了。
Hash clustering,簡而言之,就是將數據提早進行shuffle和排序,在使用數據的過程當中,讀取數據後直接參與計算。這種模式很是適合產出後後續節點屢次按照相同key進行join或者聚合的場景。固然生成hash clustering table自己也是有代價的,在生成階段會進行一次額外的shuffle。所以,這個功能並非對於全部的場景都有效,例如數據生成以後只使用了一次,那麼這個shuffle在生成表的階段進行仍是在讀表以後進行其實並無什麼區別。可是對於特定的場景,這個特性能夠起到顯著的效果。
根據這個方案,咱們重建一下snapshot表
ALTER TABLE snapshot CLUSTERED BY (key) SORTED BY (key) INTO 100 BUCKETS;複製代碼
注意這個100 bucket須要根據實際數據規模進行設置,這裏只是示例,不要照抄^_^
而後重建一下ds='20170101'的數據
INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170101')
SELECT key, value
FROM snapshot
WHERE ds='20170101'複製代碼
注意,這個過程因爲會有一個額外的shuffle階段,因此耗時會比普通的insert overwrite長。
第一次嘗試: full outer join
數據準備完成後,從新執行剛纔的full outer join語句
INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT
CASE WHEN d.key IS NULL THEN s.key ELSE d.key END,
CASE WHEN d.key IS NULL THEN s.value ELSE d.value END
FROM
(SELECT * FROM snapshot WHERE ds='20170101') s
FULL OUTER JOIN
(SELECT * FROM delta WHERE ds = '20170102')d
ON s.key = d.key;複製代碼
讓咱們看下執行計劃
結果好像不盡如人意,M1讀取了delta表,M2讀取了snapshot表而且進行了sort-merge-join操做,可是讀取完成之後數據從新進行了一次shuffle才寫入了ds='20170102'分區,爲何會這樣呢?
緣由是ds='20170102'這個分區也是一個hash clustering table的分區,在寫入的過程當中,也須要數據按照特定key進行shuffle,雖然ds='20170101'的數據是shuffle過了的,可是在後續的full outer join的過程當中,可能會存在補null的行爲,並不能保證輸出數據依然符合shuffle的特徵,因此須要進行一次reshuffle。
其實,這個sql經過CASE WHEN d.id IS NULL THEN s.id ELSE d.id END在語義上實際是保證了不會出現額外補null的行爲的,可是這個行爲目前咱們的優化器還不能識別,因此這種狀況下大表數據依然會有一次shuffle,這並不能讓咱們滿意。
第二次嘗試: not in + union all
下一個問題是如何才能讓優化器識別出來咱們其實並無改變shuffle的屬性呢,咱們觀察到這個full outer join其實這個sql就是一個求並集的過程
那麼整個這個sql能夠被拆分爲兩部分
SELECT a.key, a.value
FROM (SELECT * FROM snapshot WHERE ds='20170101' AND KEY NOT IN
(SELECT key FROM delta WHERE ds='20170102')) a -- snapshot_not_in_delta
UNION ALL
SELECT key, value FROM delta WHERE ds='20170102' -- delta_all複製代碼
在上述兩部分中,前一部分對應圖中的藍色部分,後一部分對應圖中的綠色部分。咱們僅僅是對snapshot的key列進行了過濾操做,並無改變key的分佈,因此這個語句能夠省去一次額外的shuffle。可是MaxCompute對於not in有一個限制是結果集合不能超過2000條,這個又限制了這種寫法的應用場景。
最終方案: anti semi join + union all
好在MaxCompute2.0中新支持的anti semi join一樣實現了not in的語義,並且對結果集大小並無限制,使用anti semi join 這個語句能夠進一步修改成
INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT s.key, s.value
FROM (SELECT * FROM snapshot WHERE ds='20170101') s
LEFT ANTI JOIN
(SELECT * FROM delta WHERE ds='20170102') d ON s.key = d.key
UNION ALL
SELECT key, value
FROM delta
WHERE ds='20170102';複製代碼
通過這一步的改造後,讓咱們運行一下,看看發生了什麼。
只有三個階段,M1讀取delta表,M2讀取snapshot表並進行sort-merge-join,隨後寫出數據,最後一個R3階段僅僅是一個收集信息的任務,耗時在秒級別,因此實際的處理階段只有兩個stage,其中M1合併了以前M1和J3的功能,因爲省去了一次數據讀寫、排序以及可能的spill等操做,實際運行時間每每能夠減半。
上面也說過,若是數據只是進行一次讀寫,其實hash clustering table的做用有限,可是在增量更新這個特定的場景下,咱們的輸入和輸出都爲hash clustering的數據,並且中間過程並無對cluster key進行修改,只是進行了過濾,因此咱們能夠只在一個階段中完成read->join->union all->write這四個操做,極大地縮短了運行時間。
目前hash clustering table已經在阿里巴巴內部集羣生產環境正式發佈,而且已經有螞蟻、安所有、菜鳥等多個BU參與了試用。
從螞蟻的反饋來看,改造以後的任務收效很是明顯,運行時間縮短40%到80%,節省計算資源23%到67%。
菜鳥在使用hash clustering以後,任務的執行計劃有所變化,節省了以前join操做須要的shuffle等操做,任務執行時間從40分鐘左右下降到20分鐘之內,有效的提高了任務執行效率,縮短執行時間,節約了資源。
飛豬應用Hash Clustering後,對於計算,整個計算過程由優化前的3小時,縮短到40分鐘內完成,對於明細事實表視圖一次讀取計算可在1分鐘內完成;對於存儲,節省的存儲和數據膨脹程度是線性關係,採用視圖形式,咱們用很是小的計算消耗代價節省了80%的存儲,這一點看來,是很值得的。
因此咱們付出的代價,僅僅是將表的屬性進行修改,而且提早進行一次數據生成操做,這個操做也只須要執行一次,一勞永逸。
最後,歡迎你們在本身的增量更新的任務使用hash clustering功能,從現有的經驗來看,大表的數據越多,收益越明顯。
bucket的數目設置須要一些經驗,bucket越多,併發越多,運行越快,可是若是文件自己不大,小文件也越多,目前推薦500MB~1GB設置一個bucket,超大規模數據狀況下一個bucket的數據能夠更多。在任何狀況下,不建議設置bucket number超過4096。
hash clustering table會對數據進行重排,在一些極端場景下,可能會致使原來壓縮率較高的文件壓縮率下降,影響後續的性能,這個能夠經過觀察生成表的summary的input/output bytes來確認
目前咱們正在對decimal類型進行重構,重構以後可能會影響decimal類型的分佈方式,因此clustered key不要選用decimal類型
snapshot表和delta表的schema不須要徹底一致,可是若是key的類型不一樣,好比一邊是bigint,一邊是string,在join的時候須要將delta表的類型轉換爲snapshot的key類型,不然依然會須要一次reshuffle。
以上爲今天的分享內容,謝謝你們!