本文主要介紹 Spark SQL 的多表鏈接,須要預先準備測試數據。分別建立員工和部門的 Datafame,並註冊爲臨時視圖,代碼以下:java
val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()
val empDF = spark.read.json("/usr/file/json/emp.json")
empDF.createOrReplaceTempView("emp")
val deptDF = spark.read.json("/usr/file/json/dept.json")
deptDF.createOrReplaceTempView("dept")
複製代碼
兩表的主要字段以下:git
emp 員工表
|-- ENAME: 員工姓名
|-- DEPTNO: 部門編號
|-- EMPNO: 員工編號
|-- HIREDATE: 入職時間
|-- JOB: 職務
|-- MGR: 上級編號
|-- SAL: 薪資
|-- COMM: 獎金
複製代碼
dept 部門表
|-- DEPTNO: 部門編號
|-- DNAME: 部門名稱
|-- LOC: 部門所在城市
複製代碼
注:emp.json,dept.json 能夠在本倉庫的resources 目錄進行下載。github
Spark 中支持多種鏈接類型:sql
其中內,外鏈接,笛卡爾積均與普通關係型數據庫中的相同,以下圖所示:數據庫
這裏解釋一下左半鏈接和左反鏈接,這兩個鏈接等價於關係型數據庫中的 IN
和 NOT IN
字句:json
-- LEFT SEMI JOIN
SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno
-- 等價於以下的 IN 語句
SELECT * FROM emp WHERE deptno IN (SELECT deptno FROM dept)
-- LEFT ANTI JOIN
SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno
-- 等價於以下的 IN 語句
SELECT * FROM emp WHERE deptno NOT IN (SELECT deptno FROM dept)
複製代碼
全部鏈接類型的示例代碼以下:網絡
// 1.定義鏈接表達式
val joinExpression = empDF.col("deptno") === deptDF.col("deptno")
// 2.鏈接查詢
empDF.join(deptDF,joinExpression).select("ename","dname").show()
// 等價 SQL 以下:
spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼
empDF.join(deptDF, joinExpression, "outer").show()
spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼
empDF.join(deptDF, joinExpression, "left_outer").show()
spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼
empDF.join(deptDF, joinExpression, "right_outer").show()
spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼
empDF.join(deptDF, joinExpression, "left_semi").show()
spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼
empDF.join(deptDF, joinExpression, "left_anti").show()
spark.sql("SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼
empDF.join(deptDF, joinExpression, "cross").show()
spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()
複製代碼
天然鏈接是在兩張表中尋找那些數據類型和列名都相同的字段,而後自動地將他們鏈接起來,並返回全部符合條件的結果。app
spark.sql("SELECT * FROM emp NATURAL JOIN dept").show()
複製代碼
如下是一個天然鏈接的查詢結果,程序自動推斷出使用兩張表都存在的 dept 列進行鏈接,其實際等價於:ide
spark.sql("SELECT * FROM emp JOIN dept ON emp.deptno = dept.deptno").show() 複製代碼
因爲天然鏈接經常會產生不可預期的結果,因此並不推薦使用。測試
在對大表與大表之間進行鏈接操做時,一般都會觸發 Shuffle Join
,兩表的全部分區節點會進行 All-to-All
的通信,這種查詢一般比較昂貴,會對網絡 IO 會形成比較大的負擔。
而對於大表和小表的鏈接操做,Spark 會在必定程度上進行優化,若是小表的數據量小於 Worker Node 的內存空間,Spark 會考慮將小表的數據廣播到每個 Worker Node,在每一個工做節點內部執行鏈接計算,這能夠下降網絡的 IO,但會加大每一個 Worker Node 的 CPU 負擔。
是否採用廣播方式進行 Join
取決於程序內部對小表的判斷,若是想明確使用廣播方式進行 Join
,則能夠在 DataFrame API 中使用 broadcast
方法指定須要廣播的小表:
empDF.join(broadcast(deptDF), joinExpression).show()
複製代碼
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南