本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何商業交流,可隨時聯繫。算法
主要分爲4類:sql
catalyst主要組件有數據庫
第一步:通常狀況下,streamIter爲大表,buildIter爲小表,不用關心哪一個表爲streamIter,哪一個表爲buildIter,這個spark會根據join語句自動幫咱們完成。json
第二步: 先把小表廣播到全部大表分區所在節點,而後根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable微信
第三步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功以後再檢查join key 是否相等,最後join在一塊兒分佈式
總結 : hash join 只掃描兩表一次,能夠認爲運算複雜度爲o(a+b)。函數
調優性能
1 buildIter整體估計大小超過spark.sql.autoBroadcastJoinThreshold設定的值,
即不知足broadcast join條件
2 開啓嘗試使用hash join的開關,spark.sql.join.preferSortMergeJoin=false
3 每一個分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值,即shuffle read階段每一個分區來自buildIter的記錄要能放到內存中
4 streamIter的大小是buildIter三倍以上
複製代碼
學習 Python中單引號,雙引號,3個單引號及3個雙引號的區別請參考:https://blog.csdn.net/woainishifu/article/details/76105667
from pyspark.sql.types import *
>>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)]
park.createDataFrame(rdd, schema)
df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()
+---+------+---+
| id| name|age|
+---+------+---+
| 1| Alice| 18|
| 2| Andy| 19|
| 3| Bob| 17|
| 4|Justin| 21|
| 5| Cindy| 20|
+---+------+---+
>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)])
show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ])
>>> df2 = spark.createDataFrame(rdd2, schema2)
>>> df2.show()
+-----+------+
| name|height|
+-----+------+
|Alice| 160|
| Andy| 159|
| Bob| 170|
|Cindy| 165|
| Rose| 160|
+-----+------+
複製代碼
inner join是必定要找到左右表中知足join key 條件的記錄,join key都存在的情形。學習
df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()
df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()
df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()
>>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()
+---+-----+---+------+
| id| name|age|height|
+---+-----+---+------+
| 1|Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 5|Cindy| 20| 165|
+---+-----+---+------+
複製代碼
left outer join是以左表爲準,在右表中查找匹配的記錄,若是查找失敗,左錶行Row不變,右表一行Row中全部字段都爲null的記錄。大數據
要求:左表是streamIter,右表是buildIter
df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()
>>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()
+---+------+---+------+
| id| name|age|height|
+---+------+---+------+
| 1| Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 4|Justin| 21| null|
| 5| Cindy| 20| 165|
+---+------+---+------+
複製代碼
right outer join是以右表爲準,在左表中查找匹配的記錄,若是查找失敗,右錶行Row不變,左表一行Row中全部字段都爲null的記錄。
要求:右表是streamIter,左表是buildIter
df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()
>>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()
+----+-----+----+------+
| id| name| age|height|
+----+-----+----+------+
|null| Rose|null| 160|
| 1|Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 5|Cindy| 20| 165|
+----+-----+----+------+
複製代碼
full outer join僅採用sort merge join實現,左邊和右表既要做爲streamIter,又要做爲buildIter
左表和右表已經排好序,首先分別順序取出左表和右表中的一條記錄,比較key,若是key相等,則joinrowA和rowB,並將rowA和rowB分別更新到左表和右表的下一條記錄。
若是keyA<keyB,說明右表中沒有與左表rowA對應的記錄,那麼joinrowA與nullRow。
將rowA更新到左表的下一條記錄;若是keyA>keyB,則說明左表中沒有與右表rowB對應的記錄,那麼joinnullRow與rowB。
將rowB更新到右表的下一條記錄。如此循環遍歷直到左表和右表的記錄所有處理完。
>>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()
+----+------+----+------+
| id| name| age|height|
+----+------+----+------+
|null| Rose|null| 160|
| 1| Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 4|Justin| 21| null|
| 5| Cindy| 20| 165|
+----+------+----+------+
複製代碼
left semi join是以左表爲準,在右表中查找匹配的記錄,若是查找成功,則僅返回左表Row的記錄,不然返回null。
left anti join與left semi join相反,是以左表爲準,在右表中查找匹配的記錄,若是查找成功,則返回null,不然僅返回左邊的記錄
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()
+---+------+---+---+
| id| name|age| rn|
+---+------+---+---+
| 1| Alice| 18| 1|
| 1| Cindy| 20| 2|
| 1|Justin| 21| 3|
| 3| Bob| 17| 1|
| 2| Andy| 19| 1|
+---+------+---+---+
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()
+---+------+---+---+
| id| name|age| rn|
+---+------+---+---+
| 3| Bob| 17| 1|
| 1| Alice| 18| 1|
| 2| Andy| 19| 1|
| 1| Cindy| 20| 2|
| 1|Justin| 21| 3|
+---+------+---+---+
複製代碼
一直想深刻挖掘一下SparkSQL內部join原理,終於有時間詳細的理一下 Shuffle Join 。做者還準備進一步研究Spark SQL 內核原理,敬請期待個人Spark SQL源碼剖析系列。大數據商業實戰社區微信公衆號即將開啓,敬請關注,謝謝!