Spark SQL 之 Join 實現

原文地址:Spark SQL 之 Join 實現

 

Spark SQL 之 Join 實現

塗小剛 2017-07-19 217標籤: spark , 數據庫

Join做爲SQL中一個重要語法特性,幾乎全部稍微複雜一點的數據分析場景都離不開Join,現在Spark SQL(Dataset/DataFrame)已經成爲Spark應用程序開發的主流,做爲開發者,咱們有必要了解Join在Spark中是如何組織運行的。sql

SparkSQL整體流程介紹

在闡述Join實現以前,咱們首先簡單介紹SparkSQL的整體流程,通常地,咱們有兩種方式使用SparkSQL,一種是直接寫sql語句,這個須要有元數據庫支持,例如Hive等,另外一種是經過Dataset/DataFrame編寫Spark應用程序。以下圖所示,sql語句被語法解析(SQL AST)成查詢計劃,或者咱們經過Dataset/DataFrame提供的APIs組織成查詢計劃,查詢計劃分爲兩大類:邏輯計劃和物理計劃,這個階段一般叫作邏輯計劃,通過語法分析(Analyzer)、一系列查詢優化(Optimizer)後獲得優化後的邏輯計劃,最後被映射成物理計劃,轉換成RDD執行。數據庫

更多關於SparkSQL的解析與執行請參考文章【sql的解析與執行】。對於語法解析、語法分析以及查詢優化,本文不作詳細闡述,本文重點介紹Join的物理執行過程。數據結構

Join基本要素

以下圖所示,Join大體包括三個要素:Join方式、Join條件以及過濾條件。其中過濾條件也能夠經過AND語句放在Join條件中。分佈式

Spark支持全部類型的Join,包括:性能

  • inner join
  • left outer join
  • right outer join
  • full outer join
  • left semi join
  • left anti join

下面分別闡述這幾種Join的實現。優化

Join基本實現流程

整體上來講,Join的基本實現流程以下圖所示,Spark將參與Join的兩張表抽象爲流式遍歷表(streamIter)和查找表(buildIter),一般streamIter爲大表,buildIter爲小表,咱們不用擔憂哪一個表爲streamIter,哪一個表爲buildIter,這個spark會根據join語句自動幫咱們完成。ui

在實際計算時,spark會基於streamIter來遍歷,每次取出streamIter中的一條記錄rowA,根據Join條件計算keyA,而後根據該keyA去buildIter中查找全部知足Join條件(keyB==keyA)的記錄rowBs,並將rowBs中每條記錄分別與rowAjoin獲得join後的記錄,最後根據過濾條件獲得最終join的記錄。spa

從上述計算過程當中不難發現,對於每條來自streamIter的記錄,都要去buildIter中查找匹配的記錄,因此buildIter必定要是查找性能較優的數據結構。spark提供了三種join實現:sort merge join、broadcast join以及hash join。3d

sort merge join實現

要讓兩條記錄能join到一塊兒,首先須要將具備相同key的記錄在同一個分區,因此一般來講,須要作一次shuffle,map階段根據join條件肯定每條記錄的key,基於該key作shuffle write,將可能join到一塊兒的記錄分到同一個分區中,這樣在shuffle read階段就能夠將兩個表中具備相同key的記錄拉到同一個分區處理。前面咱們也提到,對於buildIter必定要是查找性能較優的數據結構,一般咱們能想到hash表,可是對於一張較大的表來講,不可能將全部記錄所有放到hash表中,另外也能夠對buildIter先排序,查找時按順序查找,查找代價也是能夠接受的,咱們知道,spark shuffle階段自然就支持排序,這個是很是好實現的,下面是sort merge join示意圖。code

在shuffle read階段,分別對streamIter和buildIter進行merge sort,在遍歷streamIter時,對於每條記錄,都採用順序查找的方式從buildIter查找對應的記錄,因爲兩個表都是排序的,每次處理完streamIter的一條記錄後,對於streamIter的下一條記錄,只需從buildIter中上一次查找結束的位置開始查找,因此說每次在buildIter中查找沒必要重頭開始,總體上來講,查找性能仍是較優的。

broadcast join實現

爲了能具備相同key的記錄分到同一個分區,咱們一般是作shuffle,那麼若是buildIter是一個很是小的表,那麼其實就沒有必要大動干戈作shuffle了,直接將buildIter廣播到每一個計算節點,而後將buildIter放到hash表中,以下圖所示。

從上圖能夠看到,不用作shuffle,能夠直接在一個map中完成,一般這種join也稱之爲map join。那麼問題來了,何時會用broadcast join實現呢?這個不用咱們擔憂,spark sql自動幫咱們完成,當buildIter的估計大小不超過參數spark.sql.autoBroadcastJoinThreshold設定的值(默認10M),那麼就會自動採用broadcast join,不然採用sort merge join。

hash join實現

除了上面兩種join實現方式外,spark還提供了hash join實現方式,在shuffle read階段不對記錄排序,反正來自兩格表的具備相同key的記錄會在同一個分區,只是在分區內不排序,未來自buildIter的記錄放到hash表中,以便查找,以下圖所示。

不難發現,要未來自buildIter的記錄放到hash表中,那麼每一個分區來自buildIter的記錄不能太大,不然就存不下,默認狀況下hash join的實現是關閉狀態,若是要使用hash join,必須知足如下四個條件:

  • buildIter整體估計大小超過spark.sql.autoBroadcastJoinThreshold設定的值,即不知足broadcast join條件
  • 開啓嘗試使用hash join的開關,spark.sql.join.preferSortMergeJoin=false
  • 每一個分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值,即shuffle read階段每一個分區來自buildIter的記錄要能放到內存中
  • streamIter的大小是buildIter三倍以上

因此說,使用hash join的條件實際上是很苛刻的,在大多數實際場景中,即便能使用hash join,可是使用sort merge join也不會比hash join差不少,因此儘可能使用hash

下面咱們分別闡述不一樣Join方式的實現流程。

inner join

inner join是必定要找到左右表中知足join條件的記錄,咱們在寫sql語句或者使用DataFrmae時,能夠不用關心哪一個是左表,哪一個是右表,在spark sql查詢優化階段,spark會自動將大表設爲左表,即streamIter,將小表設爲右表,即buildIter。這樣對小表的查找相對更優。其基本實現流程以下圖所示,在查找階段,若是右表不存在知足join條件的記錄,則跳過。

left outer join

left outer join是以左表爲準,在右表中查找匹配的記錄,若是查找失敗,則返回一個全部字段都爲null的記錄。咱們在寫sql語句或者使用DataFrmae時,通常讓大表在左邊,小表在右邊。其基本實現流程以下圖所示。

right outer join

right outer join是以右表爲準,在左表中查找匹配的記錄,若是查找失敗,則返回一個全部字段都爲null的記錄。因此說,右表是streamIter,左表是buildIter,咱們在寫sql語句或者使用DataFrmae時,通常讓大表在右邊,小表在左邊。其基本實現流程以下圖所示。

full outer join

full outer join相對來講要複雜一點,整體上來看既要作left outer join,又要作right outer join,可是又不能簡單地先left outer join,再right outer join,最後union獲得最終結果,由於這樣最終結果中就存在兩份inner join的結果了。由於既然完成left outer join又要完成right outer join,因此full outer join僅採用sort merge join實現,左邊和右表既要做爲streamIter,又要做爲buildIter,其基本實現流程以下圖所示。

因爲左表和右表已經排好序,首先分別順序取出左表和右表中的一條記錄,比較key,若是key相等,則joinrowA和rowB,並將rowA和rowB分別更新到左表和右表的下一條記錄;若是keyA<keyB,則說明右表中沒有與左表rowA對應的記錄,那麼joinrowA與nullRow,緊接着,rowA更新到左表的下一條記錄;若是keyA>keyB,則說明左表中沒有與右表rowB對應的記錄,那麼joinnullRow與rowB,緊接着,rowB更新到右表的下一條記錄。如此循環遍歷直到左表和右表的記錄所有處理完。

left semi join

left semi join是以左表爲準,在右表中查找匹配的記錄,若是查找成功,則僅返回左邊的記錄,不然返回null,其基本實現流程以下圖所示。

left anti join

left anti join與left semi join相反,是以左表爲準,在右表中查找匹配的記錄,若是查找成功,則返回null,不然僅返回左邊的記錄,其基本實現流程以下圖所示。

總結

Join是數據庫查詢中一個很是重要的語法特性,在數據庫領域能夠說是「得join者的天下」,SparkSQL做爲一種分佈式數據倉庫系統,給咱們提供了全面的join支持,並在內部實現上無聲無息地作了不少優化,瞭解join的實現將有助於咱們更深入的瞭解咱們的應用程序的運行軌跡。

 

原文地址:Spark SQL 之 Join 實現

相關文章
相關標籤/搜索