Join有inner
,leftouter
,rightouter
,fullouter
,leftsemi
,leftanti
六種類型,對單獨版本的Join操做,能夠將問題表述爲:算法
IterA,IterB爲兩個Iterator,根據規則A將兩個Iterator中相應的Row進行合併,而後按照規則B對合並後Row進行過濾。
好比Inner_join,它的合併規則A爲:對IterA中每一條記錄,生成一個key,並利用該key從IterB的Map集合中獲取到相應記錄,並將它們進行合併;而對於規則B能夠爲任意過濾條件,好比IterA和IterB任何兩個字段進行比較操做。sql
對於IterA和IterB,當咱們利用iterA中key去IterB中進行一一匹配時,咱們稱IterA爲streamedIter
,IterB爲BuildIter
或者hashedIter
。即咱們流式遍歷streamedIter
中每一條記錄,去hashedIter
中去查找相應匹配的記錄。apache
而這個查找過程當中,即爲Build
過程,每一次Build
操做的結果即爲一條JoinRow(A,B)
,其中JoinRow(A)
來自streamedIter
,JoinRow(B)
來自BuildIter
,此時這個過程爲BuildRight
,而若是JoinRow(B)
來自streamedIter
,JoinRow(A)
來自BuildIter
,即爲BuildLeft
,分佈式
有點拗口!那麼爲何要去區分BuildLeft
和BuildRight
呢?對於leftouter
,rightouter
,leftsemi
,leftanti
,它們的Build類型是肯定,即left*
爲BuildRight
,right*
爲BuildLeft
類型,可是對於inner
操做,BuildLeft
和BuildRight
兩種均可以,並且選擇不一樣,可能有很大性能區別:ide
BuildIter也稱爲hashedIter,即須要將BuildIter構建爲一個內存Hash,從而加速Build的匹配過程;此時若是BuildIter和streamedIter大小相差較大,顯然利用小的來創建Hash,內存佔用要小不少!性能
總結一下:Join即由下面幾部分組成:ui
trait Join { val joinType: JoinType //Join類型 val streamedPlan: SparkPlan //用於生成streamedIter val buildPlan: SparkPlan //用於生成hashedIter val buildSide: BuildSide //BuildLeft或BuildRight val buildKeys: Seq[Expression] //用於從streamedIter中生成buildKey的表達式 val streamedKeys: Seq[Expression] //用於從hashedIter中生成streamedKey的表達式 val condition: Option[Expression]//對joinRow進行過濾 }
注:對於fullouter,IterA和IterB同時爲streamedIter和hashedIter,即先IterA=streamedIter,IterB=hashedIter進行leftouter,而後再用先IterB=streamedIter,IterA=hashedIter進行leftouter,再把兩次結果進行合併。spa
若是匹配成功,即構建多個JoinRow,不然返回emptycode
streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) } else { Seq.empty } }
若是匹配成功,即構建多個JoinRow,不然返回JoinRow的Build部分爲Null排序
val nullRow = new NullRow() streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) } else { Seq(joinRow.withRight(nullRow)) } }
若是匹配成功,即構建多個JoinRow,不然返回JoinRow的Build部分爲Null
val nullRow = new NullRow() streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withRight(srow)//注意與LeftOutJoin的區別 val matches = hashedIter.get(buildKeys(srow)) if (matches != null) { matches.map(joinRow.withLeft(_)).filter(condition) } else { Seq(joinRow.withLeft(nullRow)) } }
它不是返回JoinRow,而是返回srow
streamIter.filter{ srow => val matches = hashedIter.get(buildKeys(srow)) if(matches == null) { false //沒有找到匹配項 } else{ if(condition.isEmpty == false) { //須要對`假想`後joinrow進行判斷 val joinRow = new JoinedRow joinRow.withLeft(srow) ! matches.map(joinRow.withLeft(_)).filter(condition).isEmpty } else { true } } }
LeftSemi從邏輯上來講,它即爲In判斷。
它不是返回JoinRow,而是返回srow
streamIter.filter{ srow => val matches = hashedIter.get(buildKeys(srow)) if(matches == null) { true //沒有找到匹配項 } else{ if(condition.isEmpty == false) { //須要對`假想`後joinrow進行判斷 val joinRow = new JoinedRow joinRow.withLeft(srow) matches.map(joinRow.withLeft(_)).filter(condition).isEmpty } else { false } } }
上面描述的Join是須要將BuildIter
在內存中構建爲hashedIter
,從而加速匹配過程,所以咱們也將這個Join稱爲HashJoin。可是創建一個Hash表須要佔用大量的內存。
那麼問題來:若是咱們的Iter太大,沒法創建Hash表怎麼吧?在分佈式Join計算下,Join過程當中發生在Shuffle階段,若是一個數據集的Key存在數據偏移,很容易出現一個BuildIter
超過內存大小,沒法完成Hash表的創建,進而致使HashJoin失敗,那麼怎麼辦?
在HashJoin過程當中,針對
BuildIter
創建hashedIter
是爲了加速匹配過程當中。匹配查找除了創建Hash表這個方法之外,將streamedIter和BuildIter進行排序,也是一個加速匹配過程,即咱們這裏說的sortJoin。
排序不也是須要內存嗎?是的,首先排序佔用內存比創建一個hash表要小不少,其次排序若是內存不夠,能夠將一部分數據Spill到磁盤,而Hash爲全內存,若是內存不夠,將會致使整個Shuffle失敗。
下面以InnerJoin的SortJoin實現爲例子,講述它與HashJoin的區別:
利用streamIter中每一個srow,從BuildIter中順序查找,因爲兩邊都是有序的,因此查找代價很小。
val buildIndex = 0 streamIter.flatMap{ srow => val joinRow = new JoinedRow joinRow.withLeft(srow) //順序查找 val matches = BuildIter.search(buildKeys(srow), buildIndex) if (matches != null) { matches.map(joinRow.withRight(_)).filter(condition) buildIndex += matches.length } else { Seq.empty } }
對於FullOuter
Join,若是採用HashJoin方式來實現,代價較大,須要創建雙向的Hash表,而基於SortJoin,它的代價與其餘幾種Join相差不大,所以`FullOuter默認都是基於SortJon來實現。
Spark針對Join提供了分佈式實現,可是Join操做本質上也是單機進行,怎麼理解?若是要對兩個數據集進行分佈式Join,Spark會先對兩個數據集進行Exchange
,即進行ShuffleMap操做,將Key相同數據分到一個分區中,而後在ShuffleFetch過程當中利用HashJoin/SortJoin單機版算法來對兩個分區進行Join操做。
另外若是Build端的整個數據集(非一個iter)大小較小,能夠將它進行Broadcast操做,從而節約Shuffle的開銷。
所以Spark支持ShuffledHashJoinExec
,SortMergeJoinExec
,BroadcastHashJoinExec
三種Join算法,那麼它怎麼進行選擇的呢?
spark.sql.autoBroadcastJoinThreshold
,默認10M,那麼優先進行BroadcastHashJoinExecspark.sql.join.preferSortMergeJoin
爲True,那麼優先選擇SortMergeJoinExecShuffledHashJoinExec
了
這一塊邏輯都在org.apache.spark.sql.execution.JoinSelection
中描述。ps:Spark也對Without joining keys
的Join進行支持,可是不在咱們此次討論範圍中。
BroadcastHashJoinExec
val p = spark.read.parquet("/Users/p.parquet") val p1 = spark.read.parquet("/Users/p1.parquet") p.joinWith(p1, p("to_module") === p1("to_module"),"inner") 此時因爲p和p1的大小都較小,它會默認選擇BroadcastHashJoinExec == Physical Plan == BroadcastHashJoin [_1#269.to_module], [_2#270.to_module], Inner, BuildRight :- Project p :- Project p1
SortMergeJoinExec
val p = spark.read.parquet("/Users/p.parquet") val p1 = spark.read.parquet("/Users/p1.parquet") p.joinWith(p1, p("to_module") === p1("to_module"),"fullouter") fullouterJoin不支持Broadcast和ShuffledHashJoinExec,所以爲ShuffledHashJoinExec == Physical Plan == SortMergeJoin [_1#273.to_module], [_2#274.to_module], FullOuter :- Project p :- Project p1
因爲ShuffledHashJoinExec通常狀況下,不會被選擇,它的條件比較苛責。
//首先不能進行Broadcast! private def canBroadcast(plan: LogicalPlan): Boolean = { plan.statistics.isBroadcastable || plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold(10M) } //其次spark.sql.join.preferSortMergeJoin必須設置false //而後build端能夠放的進內存! private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions } //最後build端和stream端大小必須相差3倍!不然使用sort性能要好。 private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes } //或者RowOrdering.isOrderable(leftKeys)==false