前言
javascript
衆所周知,Catalyst Optimizer是Spark SQL的核心,它主要負責將SQL語句轉換成最終的物理執行計劃,在必定程度上決定了SQL執行的性能。java
Catalyst在由Optimized Logical Plan生成Physical Plan的過程當中,會根據:sql
abstract class SparkStrategies extends QueryPlanner[SparkPlan]
中的JoinSelection經過一些規則按照順序進行模式匹配,從而肯定join的最終執行策略,而且策略的選擇會按照執行效率由高到低的優先級排列。swift
在瞭解join策略選擇以前,首先看幾個先決條件:
ruby
1. build table的選擇微信
Hash Join的第一步就是根據兩表之中較小的那一個構建哈希表,這個小表就叫作build table,大表則稱爲probe table,由於須要拿小表造成的哈希表來"探測"它。源碼以下:app
/* 左表做爲build table的條件,join類型需知足: 1. InnerLike:實現目前包括inner join和cross join 2. RightOuter:right outer join */ private def canBuildLeft(joinType: JoinType): Boolean = joinType match { case _: InnerLike | RightOuter => true case _ => false } /* 右表做爲build table的條件,join類型需知足(第1種是在業務開發中寫的SQL主要適配的): 1. InnerLike、LeftOuter(left outer join)、LeftSemi(left semi join)、LeftAnti(left anti join) 2. ExistenceJoin:only used in the end of optimizer and physical plans, we will not generate SQL for this join type */ private def canBuildRight(joinType: JoinType): Boolean = joinType match { case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true case _ => false }
2. 知足什麼條件的表才能被廣播ide
若是一個表的大小小於或等於參數spark.sql.autoBroadcastJoinThreshold(默認10M)配置的值,那麼就能夠廣播該表。源碼以下:oop
private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : Boolean = { val buildLeft = canBuildLeft(joinType) && canBroadcast(left) val buildRight = canBuildRight(joinType) && canBroadcast(right) buildLeft || buildRight } private def canBroadcast(plan: LogicalPlan): Boolean = { plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold } private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : BuildSide = { val buildLeft = canBuildLeft(joinType) && canBroadcast(left) val buildRight = canBuildRight(joinType) && canBroadcast(right) // 最終會調用broadcastSide broadcastSide(buildLeft, buildRight, left, right) }
除了經過上述表的大小知足必定條件以外,咱們也能夠經過直接在Spark SQL中顯示使用hint方式(/*+ BROADCAST(small_table) */),直接指定要廣播的表,源碼以下:性能
private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : Boolean = { val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast buildLeft || buildRight } private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) : BuildSide = { val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast // 最終會調用broadcastSide broadcastSide(buildLeft, buildRight, left, right) }
不管是經過表大小進行廣播仍是根據是否指定hint進行表廣播,最終都會調用broadcastSide,來決定應該廣播哪一個表:
private def broadcastSide( canBuildLeft: Boolean, canBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): BuildSide = { def smallerSide = if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft if (canBuildRight && canBuildLeft) { // 若是左表和右表都能做爲build table,則將根據表的統計信息,肯定physical size較小的表做爲build table(即便兩個表都被指定了hint) smallerSide } else if (canBuildRight) { // 上述條件不知足,優先判斷右表是否知足build條件,知足則廣播右表。不然,接着判斷左表是否知足build條件 BuildRight } else if (canBuildLeft) { BuildLeft } else { // 若是左表和右表都不能做爲build table,則將根據表的統計信息,肯定physical size較小的表做爲build table。目前主要用於broadcast nested loop join smallerSide } }
從上述源碼可知,即便用戶指定了廣播hint,實際執行時,不必定按照hint的表進行廣播。
3. 是否可構造本地HashMap
應用於Shuffle Hash Join中,源碼以下:
// 邏輯計劃的單個分區足夠小到構建一個hash表 // 注意:要求分區數是固定的。若是分區數是動態的,還需知足其餘條件 private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { // 邏輯計劃的physical size小於spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默認200)時,便可構造本地HashMap plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions }
咱們知道,SparkSQL目前主要實現了3種join:Broadcast Hash Join、ShuffledHashJoin、Sort Merge Join。那麼Catalyst在處理SQL語句時,是依據什麼規則進行join策略選擇的呢?
1. Broadcast Hash Join
主要根據hint和size進行判斷是否知足條件。
// broadcast hints were specified case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // broadcast hints were not specified, so need to infer it from size and configuration. case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
2. Shuffle Hash Join
選擇Shuffle Hash Join須要同時知足如下條件:
spark.sql.join.preferSortMergeJoin爲false,即Shuffle Hash Join優先於Sort Merge Join
右表或左表是否可以做爲build table
是否能構建本地HashMap
以右表爲例,它的邏輯計劃大小要遠小於左表大小(默認3倍)
上述條件優先檢查右表。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) && muchSmaller(right, left) || !RowOrdering.isOrderable(leftKeys) => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildLeft(joinType) && uildLocalHashMap(left) && muchSmaller(left, right) || !RowOrdering.isOrderable(leftKeys) => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes }
若是不知足上述條件,可是若是參與join的表的key沒法被排序,即沒法使用Sort Merge Join,最終也會選擇Shuffle Hash Join。
!RowOrdering.isOrderable(leftKeys) def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
3. Sort Merge Join
若是上面兩種join策略(Broadcast Hash Join和Shuffle Hash Join)都不符合條件,而且參與join的key是可排序的,就會選擇Sort Merge Join。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
4. Without joining keys
Broadcast Hash Join、Shuffle Hash Join和Sort Merge Join都屬於經典的ExtractEquiJoinKeys(等值鏈接條件)。
對於非ExtractEquiJoinKeys,則會優先檢查表是否能夠被廣播(hint或者size)。若是能夠,則會使用BroadcastNestedLoopJoin(簡稱BNLJ),熟悉Nested Loop Join則不難理解BNLJ,主要卻別在於BNLJ加上了廣播表。
源碼以下:
// Pick BroadcastNestedLoopJoin if one side could be broadcast case j @ logical.Join(left, right, joinType, condition) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil case j @ logical.Join(left, right, joinType, condition) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
若是表不能被廣播,又細分爲兩種狀況:
若join類型InnerLike(關於InnerLike上面已有介紹)對量表直接進行笛卡爾積處理若
上述狀況都不知足,最終方案是選擇兩個表中physical size較小的表進行廣播,join策略仍爲BNLJ
源碼以下:
// Pick CartesianProduct for InnerJoin case logical.Join(left, right, _: InnerLike, condition) => joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil case logical.Join(left, right, joinType, condition) => val buildSide = broadcastSide( left.stats.hints.broadcast, right.stats.hints.broadcast, left, right) // This join could be very slow or OOM joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
很顯然,不管SQL語句最終的join策略選擇笛卡爾積仍是BNLJ,效率都很低,這一點在實際應用中,要儘可能避免。
推薦文章:
SparkSQL與Hive metastore Parquet轉換
經過Spark生成HFile,並以BulkLoad方式將數據導入到HBase
Spark SQL 小文件問題處理
關注微信公衆號:大數據學習與分享,獲取更對技術乾貨