SparkSQL大數據實戰:揭開Join的神祕面紗

本文來自 網易雲社區 。算法

 

Join操做是數據庫和大數據計算中的高級特性,大多數場景都須要進行復雜的Join操做,本文從原理層面介紹了SparkSQL支持的常見Join算法及其適用場景。sql

Join背景介紹

Join是數據庫查詢永遠繞不開的話題,傳統查詢SQL技術整體能夠分爲簡單操做(過濾操做-where、排序操做-limit等),聚合操做-groupby以及Join操做等。其中Join操做是最複雜、代價最大的操做類型,也是OLAP場景中使用相對較多的操做。所以頗有必要對其進行深刻研究。數據庫

 

另外,從業務層面來說,用戶在數倉建設的時候也會涉及Join使用的問題。一般狀況下,數據倉庫中的表通常會分爲「低層次表」和「高層次表」。緩存

 

所謂「低層次表」,就是數據源導入數倉以後直接生成的表,單表列值較少,通常能夠明顯歸爲維度表或事實表,表和表之間大多存在外健依賴,因此查詢起來會遇到大量Join運算,查詢效率不好。而「高層次表」是在「低層次表」的基礎上加工轉換而來,一般作法是使用SQL語句將須要Join的表預先進行合併造成「寬表」,在寬表上的查詢不須要執行大量Join,效率很高。但寬表缺點是數據會有大量冗餘,且相對生成較滯後,查詢結果可能並不及時。併發

 

爲了得到時效性更高的查詢結果,大多數場景都須要進行復雜的Join操做。Join操做之因此複雜,主要是一般狀況下其時間空間複雜度高,且有不少算法,在不一樣場景下須要選擇特定算法才能得到最好的優化效果。本文將介紹SparkSQL所支持的幾種常見的Join算法及其適用場景。分佈式

Join常見分類以及基本實現機制

當前SparkSQL支持三種Join算法:shuffle hash join、broadcast hash join以及sort merge join。其中前二者歸根到底都屬於hash join,只不過在hash join以前須要先shuffle仍是先broadcast。其實,hash join算法來自於傳統數據庫,而shuffle和broadcast是大數據的皮(分佈式),二者一結合就成了大數據的算法了。所以能夠說,大數據的根就是傳統數據庫。既然hash join是「內核」,那就刨出來看看,看完把「皮」再分析一下。函數

hash join

先來看看這樣一條SQL語句:select * from order,item where item.id = order.i_id,很簡單一個Join節點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。如今假設這個Join採用的是hash join算法,整個過程會經歷三步:性能

  1. 肯定Build Table以及Probe Table:這個概念比較重要,Build Table使用join key構建Hash Table,而Probe Table使用join key進行探測,探測成功就能夠join在一塊兒。一般狀況下,小表會做爲Build Table,大表做爲Probe Table。此事例中item爲Build Table,order爲Probe Table。
  2. 構建Hash Table:依次讀取Build Table(item)的數據,對於每一行數據根據join key(item.id)進行hash,hash到對應的Bucket,生成hash table中的一條記錄。數據緩存在內存中,若是內存放不下須要dump到外存。
  3. 探測:再依次掃描Probe Table(order)的數據,使用相同的hash函數映射Hash Table中的記錄,映射成功以後再檢查join條件(item.id = order.i_id),若是匹配成功就能夠將二者join在一塊兒。

 

基本流程能夠參考上圖,這裏有兩個小問題須要關注:大數據

  1. hash join性能如何?很顯然,hash join基本都只掃描兩表一次,能夠認爲o(a+b),較之最極端的笛卡爾集運算a*b,不知甩了多少條街。
  2. 爲何Build Table選擇小表?道理很簡單,由於構建的Hash Table最好能所有加載在內存,效率最高;這也決定了hash join算法只適合至少一個小表的join場景,對於兩個大表的join場景並不適用。

上文說過,hash join是傳統數據庫中的單機join算法,在分佈式環境下須要通過必定的分佈式改造,就是儘量利用分佈式計算資源進行並行化計算,提升整體效率。hash join分佈式改造通常有兩種經典方案:優化

  1. broadcast hash join:將其中一張小表廣播分發到另外一張大表所在的分區節點上,分別併發地與其上的分區記錄進行hash join。broadcast適用於小表很小,能夠直接廣播的場景。
  2. shuffler hash join:一旦小表數據量較大,此時就再也不適合進行廣播分發。這種狀況下,能夠根據join key相同必然分區相同的原理,將兩張表分別按照join key進行從新組織分區,這樣就能夠將join分而治之,劃分爲不少小join,充分利用集羣資源並行化。

下面分別進行詳細講解。

broadcast hash join

以下圖所示,broadcast hash join能夠分爲兩步:

  1. broadcast階段:將小表廣播分發到大表所在的全部主機。廣播算法能夠有不少,最簡單的是先發給driver,driver再統一分發給全部executor;要不就是基於BitTorrent的TorrentBroadcast。
  2. hash join階段:在每一個executor上執行單機版hash join,小表映射,大表試探。

        3.SparkSQL規定broadcast hash join執行的基本條件爲被廣播小表必須小於參數spark.sql.autoBroadcastJoinThreshold,默認爲10M。

shuffle hash join

在大數據條件下若是一張表很小,執行join操做最優的選擇無疑是broadcast hash join,效率最高。可是一旦小表數據量增大,廣播所需內存、帶寬等資源必然就會太大,broadcast hash join就再也不是最優方案。此時能夠按照join key進行分區,根據key相同必然分區相同的原理,就能夠將大表join分而治之,劃分爲不少小表的join,充分利用集羣資源並行化。以下圖所示,shuffle hash join也能夠分爲兩步:

  1. shuffle階段:分別將兩個表按照join key進行分區,將相同join key的記錄重分佈到同一節點,兩張表的數據會被重分佈到集羣中全部節點。這個過程稱爲shuffle。
  2. hash join階段:每一個分區節點上的數據單獨執行單機hash join算法。

 

看到這裏,能夠初步總結出來若是兩張小表join能夠直接使用單機版hash join;若是一張大表join一張極小表,能夠選擇broadcast hash join算法;而若是是一張大表join一張小表,則能夠選擇shuffle hash join算法;那若是是兩張大表進行join呢?

sort merge join

SparkSQL對兩張大表join採用了全新的算法-sort-merge join,以下圖所示,整個過程分爲三個步驟:

 

  1. shuffle階段:將兩張大表根據join key進行從新分區,兩張表數據會分佈到整個集羣,以便分佈式並行處理。
  2. sort階段:對單個分區節點的兩表數據,分別進行排序。
  3. merge階段:對排好序的兩張分區表數據執行join操做。join操做很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,不然取更小一邊。以下圖所示:

 

通過上文的分析,很明顯能夠得出來這幾種Join的代價關係:cost(broadcast hash join) < cost(shuffle hash join) < cost(sort merge join),數據倉庫設計時最好避免大表與大表的join查詢,SparkSQL也能夠根據內存資源、帶寬資源適量將參數spark.sql.autoBroadcastJoinThreshold調大,讓更多join實際執行爲broadcast hash join。

總結

Join操做是數據庫和大數據計算中的高級特性,由於其獨特的複雜性,不多有同窗可以講清楚其中的原理。本文試圖帶你們真正走進Join的世界,瞭解經常使用的幾種Join算法以及各自的適用場景。後面兩篇文章將會在此基礎上不斷深刻Join內部,一點一點地揭開它的面紗,敬請關注!

 

本文已由做者範欣欣受權網易雲社區發佈,原文連接:SparkSQL大數據實戰:揭開Join的神祕面紗

相關文章
相關標籤/搜索