Spark 學習(十) Spark 的三中Join

一,簡介sql

二,Broadcast Hash Join 數據庫

三,Shuffle Hash Joinapache

四,Sort Merge Joinapp

五,示例分佈式

 

 

正文

一,簡介

  Join是SQL語句中的經常使用操做,良好的表結構可以將數據分散在不一樣的表中,使其符合某種範式,減小表冗餘、更新容錯等。而創建表和表之間關係的最佳方式就是Join操做。性能

SparkSQL做爲大數據領域的SQL實現,天然也對Join操做作了很多優化,今天主要看一下在SparkSQL中對於Join,常見的3種實現。大數據

二,Broadcast Hash Join

  你們知道,在數據庫的常見模型中(好比星型模型或者雪花模型),表通常分爲兩種:事實表和維度表。維度表通常指固定的、變更較少的表,例如聯繫人、物品種類等,通常數據有限。而事實表通常記錄流水,好比銷售清單等,一般隨着時間的增加不斷膨脹。優化

  由於Join操做是對兩個表中key值相同的記錄進行鏈接,在SparkSQL中,對兩個表作Join最直接的方式是先根據key分區,再在每一個分區中把key值相同的記錄拿出來作鏈接操做。但這樣就不可避免地涉及到shuffle,而shuffle在Spark中是比較耗時的操做,咱們應該儘量的設計Spark應用使其避免大量的shuffle。ui

  當維度表和事實表進行Join操做時,爲了不shuffle,咱們能夠將大小有限的維度表的所有數據分發到每一個節點上,供事實表使用。executor存儲維度表的所有數據,必定程度上犧牲了空間,換取shuffle操做大量的耗時,這在SparkSQL中稱做Broadcast Join,以下圖所示:spa

  

  Table B是較小的表,將其廣播到每一個executor節點上,Table A的每一個partition會經過block manager取到Table A的數據。根據每條記錄的Join Key取到Table B中相對應的記錄,根據Join Type進行操做。這個過程比較簡單,不作贅述。

Broadcast Join的條件有如下幾個:

  1. 被廣播的表須要小於spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M (或者加了broadcast join的hint)

  2. 基表不能被廣播,好比left outer join時,只能廣播右表

  看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用於廣播較小的表,不然數據的冗餘傳輸就遠大於shuffle的開銷;另外,廣播時須要將被廣播的表現collect到driver端,當頻繁有廣播出現時,對driver的內存也是一個考驗。

三,Shuffle Hash Join

  當一側的表比較小時,咱們選擇將其廣播出去以免shuffle,提升性能。但由於被廣播的表首先被collect到driver段,而後被冗餘分發到每一個executor上,因此當表比較大時,採用broadcast join會對driver端和executor端形成較大的壓力。

  但因爲Spark是一個分佈式的計算引擎,能夠經過分區的形式將大批量的數據劃分紅n份較小的數據集進行並行計算。這種思想應用到Join上即是Shuffle Hash Join了。利用key相同必然分區相同的這個原理,SparkSQL將較大表的join分而治之,先將表劃分紅n個分區,再對兩個表中相對應分區的數據分別進行Hash Join,這樣即在必定程度上減小了driver廣播一側表的壓力,也減小了executor端取整張被廣播表的內存消耗。其原理以下圖:

   

Shuffle Hash Join分爲兩步:

  1. 對兩張表分別按照join keys進行重分區,即shuffle,目的是爲了讓有相同join keys值的記錄分到對應的分區中

  2. 對對應分區中的數據進行join,此處先將小表分區構造爲一張hash表,而後根據大表分區中記錄的join keys值拿出來進行匹配

Shuffle Hash Join的條件有如下幾個:

  1. 分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M 

  2. 基表不能被廣播,好比left outer join時,只能廣播右表

  3. 一側的表要明顯小於另一側,小的一側將被廣播(明顯小於的定義爲3倍小,此處爲經驗值)

  咱們能夠看到,在必定大小的表中,SparkSQL從時空結合的角度來看,將兩個表進行從新分區,而且對小表中的分區進行hash化,從而完成join。在保持必定複雜度的基礎上,儘可能減小driver和executor的內存壓力,提高了計算時的穩定性。

四,Sort Merge Join 

  上面介紹的兩種實現對於必定大小的表比較適用,但當兩個表都很是大時,顯然不管適用哪一種都會對計算內存形成很大壓力。這是由於join時二者採起的都是hash join,是將一側的數據徹底加載到內存中,使用hash code取join keys值相等的記錄進行鏈接。

  當兩個表都很是大時,SparkSQL採用了一種全新的方案來對錶進行Join,即Sort Merge Join。這種實現方式不用將一側數據所有加載後再進星hash join,但須要在join前將數據排序,以下圖所示:

  

  能夠看到,首先將兩張表按照join keys進行了從新shuffle,保證join keys值相同的記錄會被分在相應的分區。分區後對每一個分區內的數據進行排序,排序後再對相應的分區內的記錄進行鏈接,以下圖示:

   

  看着很眼熟吧?也很簡單,由於兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;若是不一樣,左邊小就繼續取左邊,反之取右邊。

  能夠看出,不管分區有多大,Sort Merge Join都不用把某一側的數據所有加載到內存中,而是即用即取即丟,從而大大提高了大數據量下sql join的穩定性。

 五,示例

  5.1 broadcast hash join 實例

package cn.edu360.spark08

import org.apache.spark.sql.{DataFrame, SparkSession}

object JoinTest {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
            .appName("JoinTest")
            .master("local[*]")
            .getOrCreate()
        import spark.implicits._
        // 定義表1
        val df1 = Seq(
            (0, "playing"),
            (1, "with"),
            (2, "join")
        ).toDF("id", "token")
        // 定義表2
        val df2 = Seq(
            (0, "P"),
            (1, "W"),
            (2, "S")
        ).toDF("aid", "atoken")
        // join操做
        val result: DataFrame = df1.join(df2, $"id" === $"aid")
        // 查看執行計劃
        result.explain()
        result.show()
    }
}

  執行結果:

== Physical Plan ==
*(1) BroadcastHashJoin [id#5], [aid#14], Inner, BuildRight
:- LocalTableScan [id#5, token#6]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [aid#14, atoken#15]

+---+-------+---+------+
| id|  token|aid|atoken|
+---+-------+---+------+
|  0|playing|  0|     P|
|  1|   with|  1|     W|
|  2|   join|  2|     S|
+---+-------+---+------+

  從上面的數據能夠看出,默認執行的是BroadcastHashJoin。

  5.2 SortMergeJoin實現

package cn.edu360.spark08

import org.apache.spark.sql.{DataFrame, SparkSession}

object JoinTest {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().appName("JoinTest").master("local[*]").getOrCreate()
        import spark.implicits._
        // 這裏取消BroadcastJoinThreshold 即該值爲-1,則就會使用 SortMergeJoin
        spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
        val df1 = Seq(
            (0, "playing"),
            (1, "with"),
            (2, "join")
        ).toDF("id", "token")

        val df2 = Seq(
            (0, "P"),
            (1, "W"),
            (2, "S")
        ).toDF("aid", "atoken")

        val result: DataFrame = df1.join(df2, $"id" === $"aid")
        result.explain()
        result.show()
    }
}

  輸出結果:

== Physical Plan ==
*(3) SortMergeJoin [id#5], [aid#14], Inner
:- *(1) Sort [id#5 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#5, 200)
:     +- LocalTableScan [id#5, token#6]
+- *(2) Sort [aid#14 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(aid#14, 200)
      +- LocalTableScan [aid#14, atoken#15]

+---+-------+---+------+
| id|  token|aid|atoken|
+---+-------+---+------+
|  1|   with|  1|     W|
|  2|   join|  2|     S|
|  0|playing|  0|     P|
+---+-------+---+------+

  5.3 關於shuffle hash join

  shuffle在能夠本身定義好分區,而後進行join操做。

相關文章
相關標籤/搜索