SparkSQL 之 Shuffle Join 內核原理及應用深度剖析-Spark商業源碼實戰

本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何商業交流,可隨時聯繫。算法

1 Spark SQL 堅實後盾DataFrame

  • DataFrame是一個分佈式數據容器,更像傳統數據庫的二維表格,除了數據之外,還掌握數據的結構信息,即schema。同時,與Hive相似,DataFrame也支持嵌套數據類型(struct、array和map)。
  • JSON schema自動推導
  • Hive風格分區表自動識別
  • 充分利用RCFile、ORC、Parquet等列式存儲格式的優點,僅掃描查詢真正涉及的列,忽略其他列的數據。
  • 聚合統計函數支持

2 Spark SQL 源碼包結構(溯本逐源)

主要分爲4類:sql

  • core模塊:處理數據的輸入輸出,好比:把不一樣數據源(RDD,json,Parquet等)獲取到數據,並將查詢結果輸出到DataFrame。
  • catalyst模塊:處理SQL語句的整個過程,包括解析,綁定,優化,物理計劃等查詢優化。
  • hive模塊:對hive數據進行處理。
  • hive-ThriftServer:提供CLI以及JDBC和ODBC接口。

3 Spark SQL catalyst模塊設計思路

(詳細請參看個人SparkSQL源碼解析內容)

catalyst主要組件有數據庫

  • sqlParse => sql語句的語法解析
  • Analyzer => 將不一樣來源的Unresolved Logical Plan和元數據(如hive metastore、Schema catalog)進行綁定,生成resolved Logical Plan
  • optimizer => 根據OptimizationRules,對resolvedLogicalPlan進行合併、列裁剪、過濾器下推等優化做業而轉換成optimized Logical Plan
  • Planner => LogicalPlan轉換成PhysicalPlan
  • CostModel => 根據過去的性能統計數據,選擇最佳的物理執行計劃

4 Hash Join的衍生(劍走偏鋒)

4.1 Hash join 設計思路剖析(總領全局)

  • 第一步:通常狀況下,streamIter爲大表,buildIter爲小表,不用關心哪一個表爲streamIter,哪一個表爲buildIter,這個spark會根據join語句自動幫咱們完成。
  • 第二步:根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable,位於內存中。
  • 第三步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功以後再檢查join key 是否相等,最後join在一塊兒
  • 總結 : hash join 只掃描兩表一次,能夠認爲運算複雜度爲o(a+b),效率很是高。笛卡爾集運算複雜度爲a*b。另外,構建的Hash Table最好能所有加載在內存,效率最高,這就決定了hash join算法只適合至少一個小表的join場景,對於兩個大表的join場景並不適用。

4.2 broadcast Hash join 設計思路剖析(大表join極小表)

  • 第一步:通常狀況下,streamIter爲大表,buildIter爲小表,不用關心哪一個表爲streamIter,哪一個表爲buildIter,這個spark會根據join語句自動幫咱們完成。json

  • 第二步: 先把小表廣播到全部大表分區所在節點,而後根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable微信

  • 第三步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功以後再檢查join key 是否相等,最後join在一塊兒分佈式

  • 總結 : hash join 只掃描兩表一次,能夠認爲運算複雜度爲o(a+b)。函數

  • 調優性能

    1 buildIter整體估計大小超過spark.sql.autoBroadcastJoinThreshold設定的值,
      即不知足broadcast join條件
      
     2 開啓嘗試使用hash join的開關,spark.sql.join.preferSortMergeJoin=false
      
     3 每一個分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值,即shuffle read階段每一個分區來自buildIter的記錄要能放到內存中
      
     4 streamIter的大小是buildIter三倍以上
    複製代碼

4.2 shuffle Hash join 設計思路剖析(大表join小表)

  • 第一步:通常狀況下,streamIter爲大表,buildIter爲小表,不用關心哪一個表爲streamIter,哪一個表爲buildIter,這個spark會根據join語句自動幫咱們完成。
  • 第二步: 將具備相同性質的(如Hash值相同)join key 進行Shuffle到同一個分區。
  • 第三步:先把小表廣播到全部大表分區所在節點,而後根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable
  • 第四步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功以後再檢查join key 是否相等,最後join在一塊兒

5 Sort Merge join (橫行無敵)(大表join大表)

  • 第一步:通常狀況下,streamIter爲大表,buildIter爲小表,不用關心哪一個表爲streamIter,哪一個表爲buildIter,這個spark會根據join語句自動幫咱們完成。
  • 第二步: 將具備相同性質的(如Hash值相同)join key 進行Shuffle到同一個分區。
  • 第三步: 對streamIter 和 buildIter在shuffle read過程當中先排序,join匹配時按順序查找,匹配結束後沒必要重頭開始,利用shuffle sort特性,查找性能解決了大表對大表的情形。

6 Spark Join 類型詳解

6.0 準備數據集( Justin => 左表有,Rose =>右表有)

學習 Python中單引號,雙引號,3個單引號及3個雙引號的區別請參考:https://blog.csdn.net/woainishifu/article/details/76105667

from pyspark.sql.types import * 

>>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)]
park.createDataFrame(rdd, schema)
df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) 
>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1| Alice| 18|
|  2|  Andy| 19|
|  3|   Bob| 17|
|  4|Justin| 21|
|  5| Cindy| 20|
+---+------+---+

>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)]) 
show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ]) 
>>> df2 = spark.createDataFrame(rdd2, schema2) 
>>> df2.show()
+-----+------+
| name|height|
+-----+------+
|Alice|   160|
| Andy|   159|
|  Bob|   170|
|Cindy|   165|
| Rose|   160|
+-----+------+
複製代碼

6.1 inner join

  • inner join是必定要找到左右表中知足join key 條件的記錄,join key都存在的情形。學習

    df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()
      df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()
      df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()
      
       >>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()
          +---+-----+---+------+
          | id| name|age|height|
          +---+-----+---+------+
          |  1|Alice| 18|   160|
          |  2| Andy| 19|   159|
          |  3|  Bob| 17|   170|
          |  5|Cindy| 20|   165|
          +---+-----+---+------+   
    複製代碼

6.2 left outer join

  • left outer join是以左表爲準,在右表中查找匹配的記錄,若是查找失敗,左錶行Row不變,右表一行Row中全部字段都爲null的記錄。大數據

  • 要求:左表是streamIter,右表是buildIter

    df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()
          
      >>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()
          +---+------+---+------+
          | id|  name|age|height|
          +---+------+---+------+
          |  1| Alice| 18|   160|
          |  2|  Andy| 19|   159|
          |  3|   Bob| 17|   170|
          |  4|Justin| 21|  null|
          |  5| Cindy| 20|   165|
          +---+------+---+------+
    複製代碼

6.3 right outer join

  • right outer join是以右表爲準,在左表中查找匹配的記錄,若是查找失敗,右錶行Row不變,左表一行Row中全部字段都爲null的記錄。

  • 要求:右表是streamIter,左表是buildIter

    df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()
      >>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()
      +----+-----+----+------+
      |  id| name| age|height|
      +----+-----+----+------+
      |null| Rose|null|   160|
      |   1|Alice|  18|   160|
      |   2| Andy|  19|   159|
      |   3|  Bob|  17|   170|
      |   5|Cindy|  20|   165|
      +----+-----+----+------+
    複製代碼

6.4 full outer join

  • full outer join僅採用sort merge join實現,左邊和右表既要做爲streamIter,又要做爲buildIter

  • 左表和右表已經排好序,首先分別順序取出左表和右表中的一條記錄,比較key,若是key相等,則joinrowA和rowB,並將rowA和rowB分別更新到左表和右表的下一條記錄。

  • 若是keyA<keyB,說明右表中沒有與左表rowA對應的記錄,那麼joinrowA與nullRow。

  • 將rowA更新到左表的下一條記錄;若是keyA>keyB,則說明左表中沒有與右表rowB對應的記錄,那麼joinnullRow與rowB。

  • 將rowB更新到右表的下一條記錄。如此循環遍歷直到左表和右表的記錄所有處理完。

    >>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()
          +----+------+----+------+
          |  id|  name| age|height|
          +----+------+----+------+
          |null|  Rose|null|   160|
          |   1| Alice|  18|   160|
          |   2|  Andy|  19|   159|
          |   3|   Bob|  17|   170|
          |   4|Justin|  21|  null|
          |   5| Cindy|  20|   165|
          +----+------+----+------+
    複製代碼

6.5 left semi join

left semi join是以左表爲準,在右表中查找匹配的記錄,若是查找成功,則僅返回左表Row的記錄,不然返回null。

6.6 left anti join

left anti join與left semi join相反,是以左表爲準,在右表中查找匹配的記錄,若是查找成功,則返回null,不然僅返回左邊的記錄

6.6 row_number().over()

from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.createDataFrame(rdd, schema)
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()
     +---+------+---+---+
    | id|  name|age| rn|
    +---+------+---+---+
    |  1| Alice| 18|  1|
    |  1| Cindy| 20|  2|
    |  1|Justin| 21|  3|
    |  3|   Bob| 17|  1|
    |  2|  Andy| 19|  1|
    +---+------+---+---+

df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()

    +---+------+---+---+
    | id|  name|age| rn|
    +---+------+---+---+
    |  3|   Bob| 17|  1|
    |  1| Alice| 18|  1|
    |  2|  Andy| 19|  1|
    |  1| Cindy| 20|  2|
    |  1|Justin| 21|  3|
    +---+------+---+---+
複製代碼

7 結語

一直想深刻挖掘一下SparkSQL內部join原理,終於有時間詳細的理一下 Shuffle Join 。做者還準備進一步研究Spark SQL 內核原理,敬請期待個人Spark SQL源碼剖析系列。大數據商業實戰社區微信公衆號即將開啓,敬請關注,謝謝!

在這裏插入圖片描述
秦凱新 於深圳 201811200130
相關文章
相關標籤/搜索