SparkSQL的3種Join實現

引言Join是SQL語句中的經常使用操做,良好的表結構可以將數據分散在不一樣的表中,使其符合某種範式,減小表冗餘、更新容錯等。而創建表和表之間關係的最佳方式就是Join操做。web

對於Spark來講有3中Join的實現,每種Join對應着不一樣的應用場景:算法

Broadcast Hash Join :適合一張較小的表和一張大表進行joinsql

Shuffle Hash Join : 適合一張小表和一張大表進行join,或者是兩張小表之間的join數據庫

Sort Merge Join :適合兩張較大的表之間進行join緩存

前二者都基於的是Hash Join,只不過在hash join以前須要先shuffle仍是先broadcast。下面將詳細的解釋一下這三種不一樣的join的具體原理。併發

Hash Join先來看看這樣一條SQL語句:分佈式

select * from order,item where item.id = order.i_id函數

  1. 肯定Build Table以及Probe Table:這個概念比較重要,Build Table使用join key構建Hash Table,而Probe Table使用join key進行探測,探測成功就能夠join在一塊兒。一般狀況下,小表會做爲Build Table,大表做爲Probe Table。此事例中item爲Build Table,order爲Probe Table;很簡單一個Join節點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。如今假設這個Join採用的是hash join算法,整個過程會經歷三步:
  2. 構建Hash Table:依次讀取Build Table(item)的數據,對於每一行數據根據join key(item.id)進行hash,hash到對應的Bucket,生成hash table中的一條記錄。數據緩存在內存中,若是內存放不下須要dump到外存;
  3. 探測:再依次掃描Probe Table(order)的數據,使用相同的hash函數映射Hash Table中的記錄,映射成功以後再檢查join條件(item.id = order.i_id),若是匹配成功就能夠將二者join在一塊兒。

基本流程能夠參考上圖,這裏有兩個小問題須要關注:性能

  1. hash join性能如何?很顯然,hash join基本都只掃描兩表一次,能夠認爲o(a+b),較之最極端的笛卡爾集運算a*b,不知甩了多少條街;
  2. 爲何Build Table選擇小表?道理很簡單,由於構建的Hash Table最好能所有加載在內存,效率最高;這也決定了hash join算法只適合至少一個小表的join場景,對於兩個大表的join場景並不適用。

上文說過,hash join是傳統數據庫中的單機join算法,在分佈式環境下須要通過必定的分佈式改造,說到底就是儘量利用分佈式計算資源進行並行化計算,提升整體效率。hash join分佈式改造通常有兩種經典方案:大數據

  1. broadcast hash join:將其中一張小表廣播分發到另外一張大表所在的分區節點上,分別併發地與其上的分區記錄進行hash join。broadcast適用於小表很小,能夠直接廣播的場景;
  2. shuffler hash join:一旦小表數據量較大,此時就再也不適合進行廣播分發。這種狀況下,能夠根據join key相同必然分區相同的原理,將兩張表分別按照join key進行從新組織分區,這樣就能夠將join分而治之,劃分爲不少小join,充分利用集羣資源並行化。

Broadcast Hash Join你們知道,在數據庫的常見模型中(好比星型模型或者雪花模型),表通常分爲兩種:事實表和維度表。維度表通常指固定的、變更較少的表,例如聯繫人、物品種類等,通常數據有限。而事實表通常記錄流水,好比銷售清單等,一般隨着時間的增加不斷膨脹。

由於Join操做是對兩個表中key值相同的記錄進行鏈接,在SparkSQL中,對兩個表作Join最直接的方式是先根據key分區,再在每一個分區中把key值相同的記錄拿出來作鏈接操做。但這樣就不可避免地涉及到shuffle,而shuffle在Spark中是比較耗時的操做,咱們應該儘量的設計Spark應用使其避免大量的shuffle。

當維度表和事實表進行Join操做時,爲了不shuffle,咱們能夠將大小有限的維度表的所有數據分發到每一個節點上,供事實表使用。executor存儲維度表的所有數據,必定程度上犧牲了空間,換取shuffle操做大量的耗時,這在SparkSQL中稱做Broadcast Join,以下圖所示:

Table B是較小的表,黑色表示將其廣播到每一個executor節點上,Table A的每一個partition會經過block manager取到Table A的數據。根據每條記錄的Join Key取到Table B中相對應的記錄,根據Join Type進行操做。這個過程比較簡單,不作贅述。

Broadcast Join的條件有如下幾個:

  1. 被廣播的表須要小於spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M (或者加了broadcast join的hint)
  2. 基表不能被廣播,好比left outer join時,只能廣播右表

看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用於廣播較小的表,不然數據的冗餘傳輸就遠大於shuffle的開銷;另外,廣播時須要將被廣播的表現collect到driver端,當頻繁有廣播出現時,對driver的內存也是一個考驗。

以下圖所示,broadcast hash join能夠分爲兩步:

  1. broadcast階段:將小表廣播分發到大表所在的全部主機。廣播算法能夠有不少,最簡單的是先發給driver,driver再統一分發給全部executor;要不就是基於bittorrete的p2p思路;
  2. hash join階段:在每一個executor上執行單機版hash join,小表映射,大表試探;

SparkSQL規定broadcast hash join執行的基本條件爲被廣播小表必須小於參數spark.sql.autoBroadcastJoinThreshold,默認爲10M。

Shuffle Hash Join當一側的表比較小時,咱們選擇將其廣播出去以免shuffle,提升性能。但由於被廣播的表首先被collect到driver段,而後被冗餘分發到每一個executor上,因此當表比較大時,採用broadcast join會對driver端和executor端形成較大的壓力。

但因爲Spark是一個分佈式的計算引擎,能夠經過分區的形式將大批量的數據劃分紅n份較小的數據集進行並行計算。這種思想應用到Join上即是Shuffle Hash Join了。利用key相同必然分區相同的這個原理,兩個表中,key相同的行都會被shuffle到同一個分區中,SparkSQL將較大表的join分而治之,先將表劃分紅n個分區,再對兩個表中相對應分區的數據分別進行Hash Join,這樣即在必定程度上減小了driver廣播一側表的壓力,也減小了executor端取整張被廣播表的內存消耗。其原理以下圖:

Shuffle Hash Join分爲兩步:

  1. 對兩張表分別按照join keys進行重分區,即shuffle,目的是爲了讓有相同join keys值的記錄分到對應的分區中
  2. 對對應分區中的數據進行join,此處先將小表分區構造爲一張hash表,而後根據大表分區中記錄的join keys值拿出來進行匹配

Shuffle Hash Join的條件有如下幾個:

  1. 分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M
  2. 基表不能被廣播,好比left outer join時,只能廣播右表
  3. 一側的表要明顯小於另一側,小的一側將被廣播(明顯小於的定義爲3倍小,此處爲經驗值)

咱們能夠看到,在必定大小的表中,SparkSQL從時空結合的角度來看,將兩個表進行從新分區,而且對小表中的分區進行hash化,從而完成join。在保持必定複雜度的基礎上,儘可能減小driver和executor的內存壓力,提高了計算時的穩定性。

在大數據條件下若是一張表很小,執行join操做最優的選擇無疑是broadcast hash join,效率最高。可是一旦小表數據量增大,廣播所需內存、帶寬等資源必然就會太大,broadcast hash join就再也不是最優方案。此時能夠按照join key進行分區,根據key相同必然分區相同的原理,就能夠將大表join分而治之,劃分爲不少小表的join,充分利用集羣資源並行化。以下圖所示,shuffle hash join也能夠分爲兩步:

  1. shuffle階段:分別將兩個表按照join key進行分區,將相同join key的記錄重分佈到同一節點,兩張表的數據會被重分佈到集羣中全部節點。這個過程稱爲shuffle
  2. hash join階段:每一個分區節點上的數據單獨執行單機hash join算法。

看到這裏,能夠初步總結出來若是兩張小表join能夠直接使用單機版hash join;若是一張大表join一張極小表,能夠選擇broadcast hash join算法;而若是是一張大表join一張小表,則能夠選擇shuffle hash join算法;那若是是兩張大表進行join呢?

Sort Merge Join上面介紹的兩種實現對於必定大小的表比較適用,但當兩個表都很是大時,顯然不管適用哪一種都會對計算內存形成很大壓力。這是由於join時二者採起的都是hash join,是將一側的數據徹底加載到內存中,使用hash code取join keys值相等的記錄進行鏈接。

當兩個表都很是大時,SparkSQL採用了一種全新的方案來對錶進行Join,即Sort Merge Join。這種實現方式不用將一側數據所有加載後再進星hash join,但須要在join前將數據排序,以下圖所示:

能夠看到,首先將兩張表按照join keys進行了從新shuffle,保證join keys值相同的記錄會被分在相應的分區。分區後對每一個分區內的數據進行排序,排序後再對相應的分區內的記錄進行鏈接,以下圖示:

看着很眼熟吧?也很簡單,由於兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;若是不一樣,左邊小就繼續取左邊,反之取右邊。

能夠看出,不管分區有多大,Sort Merge Join都不用把某一側的數據所有加載到內存中,而是即用即取即丟,從而大大提高了大數據量下sql join的穩定性。

SparkSQL對兩張大表join採用了全新的算法-sort-merge join,以下圖所示,整個過程分爲三個步驟:

  1. shuffle階段:將兩張大表根據join key進行從新分區,兩張表數據會分佈到整個集羣,以便分佈式並行處理;
  2. sort階段:對單個分區節點的兩表數據,分別進行排序;
  3. merge階段:對排好序的兩張分區表數據執行join操做。join操做很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,不然取更小一邊,見下圖示意:

通過上文的分析,能夠明確每種Join算法都有本身的適用場景,數據倉庫設計時最好避免大表與大表的join查詢,SparkSQL也能夠根據內存資源、帶寬資源適量將參數spark.sql.autoBroadcastJoinThreshold調大,讓更多join實際執行爲broadcast hash join。

相關文章
相關標籤/搜索