Spark 系列(十二)—— Spark SQL JOIN 操做

1、 數據準備

本文主要介紹 Spark SQL 的多表鏈接,須要預先準備測試數據。分別建立員工和部門的 Datafame,並註冊爲臨時視圖,代碼以下:java

val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()

val empDF = spark.read.json("/usr/file/json/emp.json")
empDF.createOrReplaceTempView("emp")

val deptDF = spark.read.json("/usr/file/json/dept.json")
deptDF.createOrReplaceTempView("dept")
複製代碼

兩表的主要字段以下:git

emp 員工表
 |-- ENAME: 員工姓名
 |-- DEPTNO: 部門編號
 |-- EMPNO: 員工編號
 |-- HIREDATE: 入職時間
 |-- JOB: 職務
 |-- MGR: 上級編號
 |-- SAL: 薪資
 |-- COMM: 獎金  
複製代碼
dept 部門表
 |-- DEPTNO: 部門編號
 |-- DNAME:  部門名稱
 |-- LOC:    部門所在城市
複製代碼

注:emp.json,dept.json 能夠在本倉庫的resources 目錄進行下載。github

2、鏈接類型

Spark 中支持多種鏈接類型:sql

  • Inner Join : 內鏈接;
  • Full Outer Join : 全外鏈接;
  • Left Outer Join : 左外鏈接;
  • Right Outer Join : 右外鏈接;
  • Left Semi Join : 左半鏈接;
  • Left Anti Join : 左反鏈接;
  • Natural Join : 天然鏈接;
  • Cross (or Cartesian) Join : 交叉 (或笛卡爾) 鏈接。

其中內,外鏈接,笛卡爾積均與普通關係型數據庫中的相同,以下圖所示:數據庫

https://github.com/heibaiying

這裏解釋一下左半鏈接和左反鏈接,這兩個鏈接等價於關係型數據庫中的 INNOT IN 字句:json

-- LEFT SEMI JOIN
SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno
-- 等價於以下的 IN 語句
SELECT * FROM emp WHERE deptno IN (SELECT deptno FROM dept)

-- LEFT ANTI JOIN
SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno
-- 等價於以下的 IN 語句
SELECT * FROM emp WHERE deptno NOT IN (SELECT deptno FROM dept)
複製代碼

全部鏈接類型的示例代碼以下:網絡

2.1 INNER JOIN

// 1.定義鏈接表達式
val joinExpression = empDF.col("deptno") === deptDF.col("deptno")
// 2.鏈接查詢 
empDF.join(deptDF,joinExpression).select("ename","dname").show()

// 等價 SQL 以下:
spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼

2.2 FULL OUTER JOIN

empDF.join(deptDF, joinExpression, "outer").show()
spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼

2.3 LEFT OUTER JOIN

empDF.join(deptDF, joinExpression, "left_outer").show()
spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼

2.4 RIGHT OUTER JOIN

empDF.join(deptDF, joinExpression, "right_outer").show()
spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼

2.5 LEFT SEMI JOIN

empDF.join(deptDF, joinExpression, "left_semi").show()
spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼

2.6 LEFT ANTI JOIN

empDF.join(deptDF, joinExpression, "left_anti").show()
spark.sql("SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼

2.7 CROSS JOIN

empDF.join(deptDF, joinExpression, "cross").show()
spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼

2.8 NATURAL JOIN

天然鏈接是在兩張表中尋找那些數據類型和列名都相同的字段,而後自動地將他們鏈接起來,並返回全部符合條件的結果。app

spark.sql("SELECT * FROM emp NATURAL JOIN dept").show()
複製代碼

如下是一個天然鏈接的查詢結果,程序自動推斷出使用兩張表都存在的 dept 列進行鏈接,其實際等價於:ide

spark.sql("SELECT * FROM emp JOIN dept ON emp.deptno = dept.deptno").show() 複製代碼

https://github.com/heibaiying

因爲天然鏈接經常會產生不可預期的結果,因此並不推薦使用。測試

3、鏈接的執行

在對大表與大表之間進行鏈接操做時,一般都會觸發 Shuffle Join,兩表的全部分區節點會進行 All-to-All 的通信,這種查詢一般比較昂貴,會對網絡 IO 會形成比較大的負擔。

https://github.com/heibaiying

而對於大表和小表的鏈接操做,Spark 會在必定程度上進行優化,若是小表的數據量小於 Worker Node 的內存空間,Spark 會考慮將小表的數據廣播到每個 Worker Node,在每一個工做節點內部執行鏈接計算,這能夠下降網絡的 IO,但會加大每一個 Worker Node 的 CPU 負擔。

是否採用廣播方式進行 Join 取決於程序內部對小表的判斷,若是想明確使用廣播方式進行 Join,則能夠在 DataFrame API 中使用 broadcast 方法指定須要廣播的小表:

empDF.join(broadcast(deptDF), joinExpression).show()
複製代碼

參考資料

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索