數據分析中將兩個數據集進行 Join 操做是很常見的場景。在 Spark 的物理計劃(physical plan)階段,Spark 的 JoinSelection 類會根據 Join hints 策略、Join 表的大小、 Join 是等值 Join(equi-join) 仍是不等值(non-equi-joins)以及參與 Join 的 key 是否能夠排序等條件來選擇最終的 Join 策略(join strategies),最後 Spark 會利用選擇好的 Join 策略執行最終的計算。當前 Spark(Apache Spark 3.0)一共支持五種 Join 策略:html
- Broadcast hash join (BHJ)
- Shuffle hash join(SHJ)
- Shuffle sort merge join (SMJ)
- Shuffle-and-replicate nested loop join,又稱笛卡爾積(Cartesian product join).)
- Broadcast nested loop join (BNLJ)
其中 BHJ 和 SMJ 這兩種 Join 策略是咱們運行 Spark 做業最多見的。JoinSelection 會先根據 Join 的 Key 爲等值 Join 來選擇 Broadcast hash join、Shuffle hash join 以及 Shuffle sort merge join 中的一個;若是 Join 的 Key 爲不等值 Join 或者沒有指定 Join 條件,則會選擇 Broadcast nested loop join 或 Shuffle-and-replicate nested loop join。算法
不一樣的 Join 策略在執行上效率差異很大,做爲一個大數據分析師,瞭解每種 Join 策略的執行過程和適用條件是頗有必要的。本文將給你們簡單介紹一下這五種 Join 策略的選擇條件和執行過程。sql
文章目錄apache
Broadcast Hash Join (BHJ)
BHJ 又稱 map-side-only join,從名字能夠看出,Join 是在 map 端進行的。這種 Join 要求一張表很小,小到足以將表的數據所有放到 Driver 和 Executor 端的內存中,而另一張表很大。微信
Broadcast Hash Join 的實現是將小表的數據廣播(broadcast)到 Spark 全部的 Executor 端,這個廣播過程和咱們本身去廣播數據沒什麼區別,先利用 collect 算子將小表的數據從 Executor 端拉到 Driver 端,而後在 Driver 端調用 sparkContext.broadcast 廣播到全部 Executor 端;而後在 Executor 端這個廣播出去的數據會和大表進行 Join 操做,這種 Join 策略避免了 Shuffle 操做。通常而言,Broadcast Hash Join 會比其餘 Join 策略執行的要快,但這個也不是必定的,感興趣的能夠看下過往記憶大數據的 《Spark SQL 中 Broadcast Join 必定比 Shuffle Join 快?那你就錯了。》 文章。app
Broadcast Hash Join 的適用條件
使用這個 Join 策略必須知足如下條件:ide
- 小表的數據必須很小,能夠經過
spark.sql.autoBroadcastJoinThreshold
參數來配置,默認是 10MB,若是你的內存比較大,能夠將這個閾值適當加大;若是將spark.sql.autoBroadcastJoinThreshold
參數設置爲 -1,能夠關閉 BHJ; - 只能用於等值 Join,不要求參與 Join 的 keys 可排序;
- 除了 full outer joins ,支持全部的 Join 類型。
Broadcast Hash Join 使用例子
假設咱們有如下 Spark 程序:oop
scala> val iteblogDF = Seq( | (0, "https://www.iteblog.com"), | (1, "iteblog_hadoop"), | (2, "iteblog") | ).toDF("id", "info") iteblogDF: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val r = iteblogDF.join(iteblogDF, Seq("id"), "inner") r: org.apache.spark.sql.DataFrame = [id: int, info: string ... 1 more field] scala> r.explain == Physical Plan == *(1) Project [id#7, info#8, info#12] +- *(1) BroadcastHashJoin [id#7], [id#11], Inner, BuildRight :- *(1) LocalTableScan [id#7, info#8] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#15] +- LocalTableScan [id#11, info#12] scala> r.show(false) +---+-----------------------+-----------------------+ |id |info |info | +---+-----------------------+-----------------------+ |0 |https://www.iteblog.com|https://www.iteblog.com| |1 |iteblog_hadoop |iteblog_hadoop | |2 |iteblog |iteblog | +---+-----------------------+-----------------------+
從 r.explain
結果就能夠看出,裏面使用了 BroadcastHashJoin,物理計劃以下測試
Spark 中 Broadcast Hash Join 是在 BroadcastHashJoinExec 類裏面實現的。大數據
Shuffle Hash Join(SHJ)
前面介紹的 Broadcast hash join 要求參與 Join 的一張表大小小於 spark.sql.autoBroadcastJoinThreshold
配置的值,可是當咱們表的數據比這個大,並且這張表的數據又不適合使用廣播,這個時候就能夠考慮使用 Shuffle hash join。
Shuffle hash join 一樣是在大表和小表進行 Join 的時候選擇的一種策略,它的計算思想是:把大表和小表按照相同的分區算法和分區數進行分區(根據參與 Join 的 keys 進行分區),這樣就保證了 hash 值同樣的數據都分發到同一個分區中,而後在同一個 Executor 中兩張表 hash 值同樣的分區就能夠在本地進行 hash Join 了。在進行 Join 以前,還會對小表 hash 完的分區構建 hash map。 Shuffle hash join 利用了分治思想,把大問題拆解成小問題去解決。
Shuffle Hash Join 的適用條件
要啓用 Shuffle Hash Join 必須知足如下幾個條件:
- 僅支持等值 Join,不要求參與 Join 的 Keys 可排序;
spark.sql.join.preferSortMergeJoin
參數必須設置爲 false,參數是從 Spark 2.0.0 版本引入的,默認值爲 true,也就是默認狀況下選擇 Sort Merge Join;- 小表的大小(plan.stats.sizeInBytes)必須小於
spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions
;並且小表大小(stats.sizeInBytes)的三倍必須小於等於大表的大小(stats.sizeInBytes),也就是 a.stats.sizeInBytes * 3 < = b.stats.sizeInBytes
Shuffle Hash Join 使用例子
// 由於咱們下面測試數據都很小,因此咱們先把 BroadcastJoin 關閉 scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1) // 爲了啓用 Shuffle Hash Join 必須將 spark.sql.join.preferSortMergeJoin 設置爲 false scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", false) scala> val iteblogDF1 = Seq( | (2, "iteblog") | ).toDF("id", "info") iteblogDF1: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val iteblogDF2 = Seq( | (0, "https://www.iteblog.com"), | (1, "iteblog_hadoop"), | (2, "iteblog") | ).toDF("id", "info") iteblogDF2: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val r = iteblogDF1.join(iteblogDF, Seq("id"), "inner") r: org.apache.spark.sql.DataFrame = [id: int, info: string ... 1 more field] scala> r.explain == Physical Plan == *(1) Project [id#52, info#53, info#37] +- ShuffledHashJoin [id#52], [id#36], Inner, BuildLeft :- Exchange hashpartitioning(id#52, 200), true, [id=#172] : +- LocalTableScan [id#52, info#53] +- Exchange hashpartitioning(id#36, 200), true, [id=#173] +- LocalTableScan [id#36, info#37] scala> r.show(false) +---+-------+-------+ |id |info |info | +---+-------+-------+ |2 |iteblog|iteblog| +---+-------+-------+
從 r.explain
結果就能夠看出,裏面使用了 ShuffledHashJoin,物理計劃以下
從上圖能夠看出,在進行 ShuffledHashJoin 的時候 Spark 構建了build hash map,因此若是小表分區後的數據還比較大,可能會參數 OOM 的問題。在 Spark 中,ShuffledHashJoin 的實如今 ShuffledHashJoinExec 類裏面,感興趣的同窗能夠去看下。
Shuffle Sort Merge Join (SMJ)
前面兩種 Join 策略對錶的大小都有條件的,若是參與 Join 的表都很大,這時候就得考慮用 Shuffle Sort Merge Join 了。
Shuffle Sort Merge Join 的實現思想::也是對兩張表參與 Join 的 Keys 使用相同的分區算法和分區數進行分區,目的就是保證相同的 Keys 都落到相同的分區裏面。分區完以後再對每一個分區按照參與 Join 的 Keys 進行排序,最後 Reduce 端獲取兩張表相同分區的數據進行 Merge Join,也就是 Keys 相同說明 Join 上了。
Shuffle Sort Merge Join 的適用條件
Shuffle Sort Merge Join 並非必定就使用的,也須要知足如下條件:
- 僅支持等值 Join,而且要求參與 Join 的 Keys 可排序;
Shuffle Sort Merge Join 使用例子
// 由於咱們下面測試數據都很小,因此咱們先把 BroadcastJoin 關閉 scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1) scala> val iteblogDF1 = Seq( | (0, "111"), | (1, "222"), | (2, "333") | ).toDF("id", "info") iteblogDF1: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val iteblogDF2 = Seq( | (0, "https://www.iteblog.com"), | (1, "iteblog_hadoop"), | (2, "iteblog") | ).toDF("id", "info") iteblogDF2: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val r = iteblogDF1.join(iteblogDF2, Seq("id"), "inner") r: org.apache.spark.sql.DataFrame = [id: int, info: string ... 1 more field] scala> r.explain == Physical Plan == *(3) Project [id#119, info#120, info#131] +- *(3) SortMergeJoin [id#119], [id#130], Inner :- *(1) Sort [id#119 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#119, 200), true, [id=#335] : +- LocalTableScan [id#119, info#120] +- *(2) Sort [id#130 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#130, 200), true, [id=#336] +- LocalTableScan [id#130, info#131] scala> r.show(false) +---+----+-----------------------+ |id |info|info | +---+----+-----------------------+ |1 |222 |iteblog_hadoop | |2 |333 |iteblog | |0 |111 |https://www.iteblog.com| +---+----+-----------------------+
從 r.explain
結果就能夠看出,裏面使用了 SortMergeJoin,物理計劃以下
Spark 裏面的大表 Join 基本上均可以使用 SortMergeJoin 來實現,對應的類爲 SortMergeJoinExec 。
Cartesian product join
和 MySQL 同樣,若是 Spark 中兩張參與 Join 的表沒指定 where 條件(ON 條件)那麼會產生 Cartesian product join,這個 Join 獲得的結果其實就是兩張行數的乘積。
Cartesian product join 的適用條件
必須是 inner Join,其支持等值和不等值 Join。
Cartesian product join 使用例子
// 由於咱們下面測試數據都很小,因此咱們先把 BroadcastJoin 關閉 scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1) scala> val iteblogDF1 = Seq( | (0, "111"), | (1, "222"), | (2, "333") | ).toDF("id", "info") iteblogDF1: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val iteblogDF2 = Seq( | (0, "https://www.iteblog.com"), | (1, "iteblog_hadoop"), | (2, "iteblog") | ).toDF("id", "info") iteblogDF2: org.apache.spark.sql.DataFrame = [id: int, info: string] // 這裏也可使用 val r = iteblogDF1.crossJoin(iteblogDF2) scala> val r = iteblogDF1.join(iteblogDF2, Nil, "inner") r: org.apache.spark.sql.DataFrame = [id: int, info: string ... 2 more fields] scala> r.explain == Physical Plan == CartesianProduct :- LocalTableScan [id#157, info#158] +- LocalTableScan [id#168, info#169] scala> r.show(false) +---+----+---+-----------------------+ |id |info|id |info | +---+----+---+-----------------------+ |0 |111 |0 |https://www.iteblog.com| |0 |111 |1 |iteblog_hadoop | |0 |111 |2 |iteblog | |1 |222 |0 |https://www.iteblog.com| |1 |222 |1 |iteblog_hadoop | |1 |222 |2 |iteblog | |2 |333 |0 |https://www.iteblog.com| |2 |333 |1 |iteblog_hadoop | |2 |333 |2 |iteblog | +---+----+---+-----------------------+
從上面結果能夠看出,Cartesian product join 產生數據的行數是兩表的乘積,當 Join 的表很大時,其效率是很是低下的,因此咱們儘可能不要使用這種 Join。在 Spark 中 Cartesian product join 的實現能夠參見 CartesianProductExec 類。
Broadcast nested loop join (BNLJ)
你能夠把 Broadcast nested loop join 的執行看作下面的計算:
for record_1 in relation_1: for record_2 in relation_2: # join condition is executed
能夠看出 Broadcast nested loop join 在某些狀況會對某張表重複掃描屢次,可見效率很是低下。從名字能夠看出,BNLJ 會根據相關條件對小表進行廣播,以減小表的掃描次數。觸發廣播的須要知足如下三個條件之一:
- right outer join 是會廣播左表;
- left outer, left semi, left anti 或者 existence join 時會廣播右表;
- inner join 的時候兩張表都會廣播。
Broadcast nested loop join 的適用條件
Broadcast nested loop join 支持等值和不等值 Join,支持全部的 Join 類型。
Broadcast nested loop join 使用例子
// 由於咱們下面測試數據都很小,因此咱們先把 BroadcastJoin 關閉 scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1) scala> val iteblogDF1 = Seq( | (0, "111"), | (1, "222"), | (2, "333") | ).toDF("id", "info") iteblogDF1: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val iteblogDF2 = Seq( | (0, "https://www.iteblog.com"), | (1, "iteblog_hadoop"), | (2, "iteblog") | ).toDF("id", "info") iteblogDF2: org.apache.spark.sql.DataFrame = [id: int, info: string] scala> val r = iteblogDF1.join(iteblogDF2, Nil, "leftouter") r: org.apache.spark.sql.DataFrame = [id: int, info: string ... 2 more fields] scala> r.explain == Physical Plan == BroadcastNestedLoopJoin BuildRight, LeftOuter :- LocalTableScan [id#157, info#158] +- BroadcastExchange IdentityBroadcastMode, [id=#516] +- LocalTableScan [id#168, info#169] scala> r.show(false) +---+----+---+-----------------------+ |id |info|id |info | +---+----+---+-----------------------+ |0 |111 |0 |https://www.iteblog.com| |0 |111 |1 |iteblog_hadoop | |0 |111 |2 |iteblog | |1 |222 |0 |https://www.iteblog.com| |1 |222 |1 |iteblog_hadoop | |1 |222 |2 |iteblog | |2 |333 |0 |https://www.iteblog.com| |2 |333 |1 |iteblog_hadoop | |2 |333 |2 |iteblog | +---+----+---+-----------------------+
上面計算的查詢計劃以下:
上面因爲是 LeftOuter Join,因此會對右表進行廣播。Broadcast nested loop join 的實現能夠參見 BroadcastNestedLoopJoinExec。
Spark 如何選擇 Join 策略
Spark 有五種 Join 策略,那麼 Spark 是按照什麼順序來選擇呢?前面咱們也說了,Spark 的 Join 策略是在 JoinSelection 類裏面實現的,關鍵代碼以下:
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) => def createJoinWithoutHint() = { createBroadcastHashJoin( canBroadcast(left) && !hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH)), canBroadcast(right) && !hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH))) .orElse { if (!conf.preferSortMergeJoin) { createShuffleHashJoin( canBuildLocalHashMap(left) && muchSmaller(left, right), canBuildLocalHashMap(right) && muchSmaller(right, left)) } else { None } } .orElse(createSortMergeJoin()) .orElse(createCartesianProduct()) .getOrElse { // This join could be very slow or OOM val buildSide = getSmallerSide(left, right) Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition)) } } createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint))) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) case logical.Join(left, right, joinType, condition, hint) => def createJoinWithoutHint() = { createBroadcastNLJoin(canBroadcast(left), canBroadcast(right)) .orElse(createCartesianProduct()) .getOrElse { // This join could be very slow or OOM Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), desiredBuildSide, joinType, condition)) } } createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) // --- Cases where this strategy does not apply --------------------------------------------- case _ => Nil }
因爲 Spark 的計算引擎優化器不是萬能的,有些場景下會選擇錯誤的 Join 策略,因此 Spark 2.4 & Spark 3.0 引入了 Join hint,也就是用戶能夠本身選擇 Join 策略。上面的代碼能夠看出,用戶指定的 Join hint 優先級最高。從代碼中能夠看出 Spark 3.0 是按照下面下面順序來選擇 Join 策略的:
先判斷是否是等值 Join,那麼是按照下面順序選擇 Join 策略:
- 用戶是否是指定了 BROADCAST hint (BROADCAST、BROADCASTJOIN 以及 MAPJOIN 中的一個),若是指定了,那就用 Broadcast Hash Join;
- 用戶是否是指定了 SHUFFLE MERGE hint (SHUFFLE_MERGE、MERGE 以及 MERGEJOIN 中的一個),若是指定了,那就用 Shuffle sort merge join;
- 用戶是否是指定了 Shuffle Hash Join hint (SHUFFLE_HASH),若是指定了,那就用 Shuffle Hash Join;
- 用戶是否是指定了 shuffle-and-replicate nested loop join hint (SHUFFLE_REPLICATE_NL),若是指定了,那就用 Cartesian product join;
- 若是用戶沒有指定任何 Join hint,那根據 Join 的適用條件按照 Broadcast Hash Join -> Shuffle Hash Join -> Sort Merge Join ->Cartesian Product Join -> Broadcast Nested Loop Join 順序選擇 Join 策略
若是是不等值 Join,那麼是按照下面順序選擇 Join 策略:
- 用戶是否是指定了 BROADCAST hint (BROADCAST、BROADCASTJOIN 以及 MAPJOIN 中的一個),若是指定了,那就廣播對應的表,並選擇 Broadcast Nested Loop Join;
- 用戶是否是指定了 shuffle-and-replicate nested loop join hint (SHUFFLE_REPLICATE_NL),若是指定了,那就用 Cartesian product join;
- 若是用戶沒有指定任何 Join hint,那根據 Join 的適用條件按照 Broadcast Nested Loop Join ->Cartesian Product Join -> Broadcast Nested Loop Join 順序選擇 Join 策略
轉載本文請加上:轉載自過往記憶(https://www.iteblog.com/)
本文連接: 【每一個 Spark 工程師都應該知道的五種 Join 策略】(https://www.iteblog.com/archives/9870.html)