做者:王道遠,花名健身, 阿里巴巴計算平臺EMR技術專家。
背景
EMR Spark提供的Relational Cache功能,能夠經過對數據模型進行預計算和高效地存儲,加速Spark SQL,爲客戶實現利用Spark SQL對海量數據進行即時查詢的目的。Relational Cache的工做原理相似物化視圖,在用戶提交SQL語句時對語句進行分析,並選出可用的預計算結果來加速查詢。爲了實現高效地預計算結果複用,咱們構建的預計算緩存通常都較爲通用,所以對於用戶query,還需進行進一步的計算方能得到最終結果。所以,如何快速地找出匹配的緩存,並構建出準確的新執行計劃,就顯得尤其重要。nginx
在Hive 3.x中支持的Materialized View,利用了Apache Calcite對執行計劃進行重寫。考慮到Spark SQL使用Catalyst進行執行計劃優化,引入Calcite過重,所以EMR Spark中的Relational Cache實現了本身的Catalyst規則,用於重寫執行計劃。本文將介紹執行計劃重寫的相關內容。sql
執行計劃重寫
準備工做
Spark會把用戶查詢語句進行解析,依次轉化爲Unresolved Logical Plan(未綁定的邏輯計劃)、Resolved Logical Plan(綁定的邏輯計劃)、Optimized Logical Plan(優化的邏輯計劃)、Physical Plan(物理計劃)。其中,未優化的邏輯計劃根據用戶查詢語句不一樣,會有較大區別,而Relational Cache做爲優化的一部分,放在邏輯計劃優化過程當中也較爲合適,所以咱們拿到的用戶查詢計劃會是優化中的邏輯計劃。要與優化中的邏輯計劃匹配,咱們選擇把這個重寫過程放在Spark優化器比較靠後的步驟中,同時,預先將Relational Cache的邏輯計劃進行解析,得到優化後的Cache計劃,減少匹配時的複雜程度。這樣,咱們只需匹配作完了謂詞下推、謂詞合併等等優化以後的兩個邏輯計劃。數據庫
基本過程
在匹配時,咱們但願能儘量多得匹配計算和IO操做,所以,咱們對目標計劃進行前序遍歷,依次進行匹配,嘗試找到最多的匹配節點。而在判斷兩個節點是否匹配時,咱們採用後序遍歷的方式,但願儘快發現不匹配的狀況,減小計劃匹配的執行時間。而後咱們會根據匹配結果,對計劃進行重寫,包括對於Cache數據進行進一步的Filter、Project、Sort甚至Aggregate等操做,使其與匹配節點徹底等價,而後更新邏輯計劃節點的引用綁定,無縫替換到邏輯計劃中,這樣就能輕鬆得到最終的重寫後的計劃。緩存
Join匹配
Spark中的Join都是二元操做,而實際的Join順序可能根據一些策略會有很大區別,所以對於Join節點,必須進行特殊處理。咱們會首先將邏輯計劃進行處理,根據緩存計劃的Join順序進行Join重排。這一步在樹狀匹配以前就進行了,避免不斷重複Join重排帶來的時間浪費。重排後的Join能夠更大機率地被咱們匹配到。微信
爲了實現Cache的通用性,根據星型數據模型的特色,咱們引入了Record Preserve的概念。這和傳統數據庫中的Primary Key/Foreign Key的關係較爲相似,當有主鍵的表與非空外鍵指向的表在外鍵上進行Join時,記錄的條數不會變化,不會膨脹某條記錄,也不會丟失某條記錄。PK/FK的語意在大數據處理框架中常常缺失,咱們引入了新的DDL讓用戶自定義Record Preserve Join的關係。當用戶定義A Inner Join B是對於A表Record Preserve時,咱們也會把A Inner Join B和A的關係匹配起來。有了PK/FK的幫助,咱們能匹配上的狀況大大增長了,一個Relational Cache能夠被更多看似區別巨大的查詢共享,這能夠很好的爲用戶節約額外的存儲開銷和預計算開銷。app
Aggregate匹配
通常的Aggregate匹配較爲簡單,而Spark支持的Grouping Set操做,會構建出Expand邏輯計劃節點,至關於把一條記錄轉爲多條,使用Grouping ID進行標記。因爲Expand的子節點是全部Grouping的狀況共用的,這裏咱們只對子節點進行一次匹配,再分別進行上面的Grouping屬性和Aggregate屬性的匹配。主要是驗證目標聚合所需的屬性或者聚合函數都能從某個Grouping ID對應的聚合結果中計算出來,好比粗粒度的Sum能夠對細粒度的Sum進行二次Sum求和,而粗粒度的Count對細粒度的Count也應經過二次Sum求和,粗粒度的Average沒法僅從細粒度的Average中還原出來等等。框架
計劃重寫
找出匹配的邏輯計劃以後,就是重寫邏輯計劃的過程。對於無需二次聚合的邏輯計劃,直接根據緩存數據的schema,從緩存數據的Relation中選擇所需列,根據條件過濾後,進行後續操做。若是還需二次聚合,選擇所需列時需保留外部要用的全部列,以及聚合時須要的列,還有聚合函數須要的數據。二次聚合的聚合函數須要根據實際狀況進行重寫,確保能使用Relational Cache中已經初步聚合的結果。這裏面須要根據聚合的語意判斷是否可以二次聚合。若是時Grouping Set的聚合,二次聚合以前還需選擇正確的Grouping ID進行過濾。通過二次聚合後,步驟大致和普通的重寫一致,只需替換到目標計劃中便可。函數
結果
咱們以一個例子來具體說明邏輯計劃的重寫結果。Star Schema Benchmark(論文連接https://www.cs.umb.edu/~poneil/StarSchemaB.pdf)是星型模型數據分析的一個標準Benchmark,其結構定義如圖所示:性能
咱們構建Relational Cache的SQL語句以下:測試
SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUMFROM supplier, p_lineorder, dates, customer, partWHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkeyGROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))
咱們從中選出一條查詢做爲示例。具體查詢語句:
select c_city, s_city, d_year, sum(lo_revenue) as revenue from customer, lineorder, supplier, dates where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and c_nation = 'UNITED KINGDOM' and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and s_nation = 'UNITED KINGDOM' and d_yearmonth = 'Dec1997' group by c_city, s_city, d_year order by d_year asc, revenue desc
原始邏輯計劃以下所示:
Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L] +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322)) +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet
因而可知,執行計劃大大簡化,咱們能夠作到亞秒級響應用戶的命中查詢。
進一步優化
在實際測試過程當中,咱們發現當多個Relational Cache存在時,匹配時間線性增加明顯。因爲咱們在metastore中存儲的是Cache的SQL語句,取SQL語句和再次解析的時間都不容小覷,這就使得匹配過程明顯增加,背離了咱們追求亞秒級響應的初衷。所以咱們在Spark中構建了邏輯計劃緩存,將解析過的Relational Cache的計劃緩存在內存中,每一個Relational Cache只緩存一份,計劃自己佔用空間有限,所以咱們能夠緩存住幾乎全部的Relational Cache的優化後的邏輯計劃,從而在第一次查詢以後,全部查詢都再也不收到取SQL語句和再次解析的延遲困擾。通過這樣的優化,匹配時間大幅減小到100ms的量級。
總結與思考
Relational Cache實現了一種基於Cache的優化方案,讓Spark SQL可以用於即時查詢的場景下,知足用戶對海量數據秒級查詢的需求。經過對用戶查詢的動態改寫,能夠大大提升緩存的利用率,擴展緩存的命中場景,有效提升查詢性能。現有方案也有不少可優化的地方,好比重複的回溯遍歷時間複雜度較高,不如在邏輯計劃節點內部更新維護可匹配的信息。考慮到對Spark的侵入性,咱們暫時選擇了現有方案,後續根據實際的使用狀況,還會進一步優化咱們的邏輯計劃重寫過程。而重寫的邏輯計劃還涉及到基於不一樣的Relational Cache Plan會有不一樣的重寫方式,在這些重寫結果中如何根據執行代價選擇最優的重寫方案,將會在後續文章中進行揭祕,敬請期待!
本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。