當前SparkSQL支持三種Join算法-shuffle hash join、broadcast hash join以及sort merge join。其中前二者歸根到底都屬於hash join,只不過在hash join以前須要先shuffle仍是先broadcast。node
選擇思路大概是:大表與小表進行join會使用broadcast hash join,一旦小表稍微大點再也不適合廣播分發就會選擇shuffle hash join,最後,兩張大表的話無疑選擇sort merge join。算法
將小錶轉換成Hash Table,用大表進行遍歷,對每一個元素去Hash Table裏查看是否存在,存在則進行jion。sql
先來看看這樣一條SQL語句:select * from order,item where item.id = order.i_id
,很簡單一個Join節點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。如今假設這個Join採用的是hash join算法,整個過程會經歷三步:數據庫
肯定Build Table以及Probe Table:這個概念比較重要,Build Table使用join key構建Hash Table,而Probe Table使用join key進行探測,探測成功就能夠join在一塊兒。一般狀況下,小表會做爲Build Table,大表做爲Probe Table。此事例中item爲Build Table,order爲Probe Table。緩存
構建Hash Table:依次讀取Build Table(item)的數據,對於每一行數據根據join key(item.id)進行hash,hash到對應的Bucket,生成hash table中的一條記錄。數據緩存在內存中,若是內存放不下須要dump到外存。網絡
探測(Probe):再依次掃描Probe Table(order)的數據,使用相同的hash函數映射Hash Table中的記錄,映射成功以後再檢查join條件(item.id = order.i_id),若是匹配成功就能夠將二者join在一塊兒。併發
基本流程能夠參考上圖,這裏有兩個小問題須要關注:分佈式
hash join性能如何?很顯然,hash join基本都只掃描兩表一次,能夠認爲o(a+b)函數
爲何Build Table選擇小表?道理很簡單,由於構建的Hash Table最好能所有加載在內存,效率最高;這也決定了hash join算法只適合至少一個小表的join場景,對於兩個大表的join場景並不適用;oop
上文說過,hash join是傳統數據庫中的單機join算法,在分佈式環境下須要通過必定的分佈式改造,說到底就是儘量利用分佈式計算資源進行並行化計算,提升整體效率。hash join分佈式改造通常有兩種經典方案:
broadcast hash join:將其中一張小表廣播分發到另外一張大表所在的分區節點上,分別併發地與其上的分區記錄進行hash join。broadcast適用於小表很小,能夠直接廣播的場景。
shuffler hash join:一旦小表數據量較大,此時就再也不適合進行廣播分發。這種狀況下,能夠根據join key相同必然分區相同的原理,將兩張表分別按照join key進行從新組織分區,這樣就能夠將join分而治之,劃分爲不少小join,充分利用集羣資源並行化。
broadcast hash join能夠分爲兩步:
broadcast階段:將小表廣播分發到大表所在的全部主機。廣播算法能夠有不少,最簡單的是先發給driver,driver再統一分發給全部executor;要不就是基於bittorrete的p2p思路;
hash join階段:在每一個executor上執行單機版hash join,小表映射,大表試探;
SparkSQL規定broadcast hash join執行的基本條件爲被廣播小表必須小於參數spark.sql.autoBroadcastJoinThreshold
,默認爲10M。
BroadcastNestedLoopJoin
cross jion
在進行笛卡爾集運算時使用了嵌套雲環的jion方式,也就是咱們經常使用的兩個for循環嵌套進行判斷。
在大數據條件下若是一張表很小,執行join操做最優的選擇無疑是broadcast hash join,效率最高。可是一旦小表數據量增大,廣播所需內存、帶寬等資源必然就會太大,broadcast hash join就再也不是最優方案。此時能夠按照join key進行分區,根據key相同必然分區相同的原理,就能夠將大表join分而治之,劃分爲不少小表的join,充分利用集羣資源並行化。以下圖所示,shuffle hash join也能夠分爲兩步:
shuffle階段:分別將兩個表按照join key進行分區,將相同join key的記錄重分佈到同一節點,兩張表的數據會被重分佈到集羣中全部節點。這個過程稱爲shuffle
hash join階段:每一個分區節點上的數據單獨執行單機hash join算法。
shuffle階段:將兩張大表根據join key進行從新分區,兩張表數據會分佈到整個集羣,以便分佈式並行處理
sort階段:對單個分區節點的兩表數據,分別進行排序
merge階段:對排好序的兩張分區表數據執行join操做。join操做很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出(相比hash jion解決了大表不能所有hash到內存中的問題)
仔細分析的話會發現,sort-merge join的代價並不比shuffle hash join小,反而是多了不少。那爲何SparkSQL還會在兩張大表的場景下選擇使用sort-merge join算法呢?這和Spark的shuffle實現有關,目前spark的shuffle實現都適用sort-based shuffle算法,所以在通過shuffle以後partition數據都是按照key排序的。所以理論上能夠認爲數據通過shuffle以後是不須要sort的,能夠直接merge。
hash jion中使用了謂詞布隆過濾器進行下推實現了FR(Runtime Filter)
, 對jion操做進一步作了優化。
因爲spark CBO分析的不許確問題致使broadcastjoin 常常出現亂廣播的情形。