過往記憶大數據 過往記憶大數據 sql
數據分析中將兩個數據集進行 Join 操做是很常見的場景。我在 這篇 文章中介紹了 Spark 支持的五種 Join 策略,本文我將給你們介紹一下 Apache Spark 中支持的 Join 類型(Join Type)。apache
目前 Apache Spark 3.0 版本中,一共支持如下七種 Join 類型:app
•INNER JOIN
•CROSS JOIN
•LEFT OUTER JOIN
•RIGHT OUTER JOIN
•FULL OUTER JOIN
•LEFT SEMI JOIN
•LEFT ANTI JOIN
在實現上,這七種 Join 對應的實現類分別以下:ide
object JoinType { def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match { case "inner" => Inner case "outer" | "full" | "fullouter" => FullOuter case "leftouter" | "left" => LeftOuter case "rightouter" | "right" => RightOuter case "leftsemi" | "semi" => LeftSemi case "leftanti" | "anti" => LeftAnti case "cross" => Cross case _ => val supported = Seq( "inner", "outer", "full", "fullouter", "full_outer", "leftouter", "left", "left_outer", "rightouter", "right", "right_outer", "leftsemi", "left_semi", "semi", "leftanti", "left_anti", "anti", "cross") throw new IllegalArgumentException(s"Unsupported join type '$typ'. " + "Supported join types include: " + supported.mkString("'", "', '", "'") + ".") } }
今天,我並不打算從底層代碼來介紹這七種 Join 類型的實現,而是從數據分析師的角度來介紹這幾種 Join 的含義和使用。在介紹下文以前,假設咱們有顧客(customer)和訂單(order)相關的兩張表,以下:oop
scala> val order = spark.sparkContext.parallelize(Seq( | (1, 101,2500), (2,102,1110), (3,103,500), (4 ,102,400) | )).toDF("paymentId", "customerId","amount") order: org.apache.spark.sql.DataFrame = [paymentId: int, customerId: int ... 1 more field] scala> order.show +---------+----------+------+ |paymentId|customerId|amount| +---------+----------+------+ | 1| 101| 2500| | 2| 102| 1110| | 3| 103| 500| | 4| 102| 400| +---------+----------+------+ scala> val customer = spark.sparkContext.parallelize(Seq( | (101,"iteblog") ,(102,"iteblog_hadoop") ,(103,"iteblog001"), (104,"iteblog002"), (105,"iteblog003"), (106,"iteblog004") | )).toDF("customerId", "name") customer: org.apache.spark.sql.DataFrame = [customerId: int, name: string] scala> customer.show +----------+--------------+ |customerId| name| +----------+--------------+ | 101| iteblog| | 102|iteblog_hadoop| | 103| iteblog001| | 104| iteblog002| | 105| iteblog003| | 106| iteblog004| +----------+--------------+ 準備好數據以後,如今咱們來一一介紹這些 Join 類型。
在 Spark 中,若是沒有指定任何 Join 類型,那麼默認就是 INNER JOIN。INNER JOIN 只會返回知足 Join 條件( join condition)的數據,這個你們用的應該比較多,具體以下:大數據
scala> val df = customer.join(order,"customerId") df: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 2 more fields] scala> df.show +----------+--------------+---------+------+ |customerId| name|paymentId|amount| +----------+--------------+---------+------+ | 101| iteblog| 1| 2500| | 103| iteblog001| 3| 500| | 102|iteblog_hadoop| 2| 1110| | 102|iteblog_hadoop| 4| 400| +----------+--------------+---------+------+
從上面能夠看出,當咱們沒有指定任何 Join 類型時,默認就是 INNER JOIN;在生成的結果中, Spark 自動爲咱們刪除了兩張表都存在的 customerId。若是用圖來表示的話, INNER JOIN 能夠以下表示:spa
上圖粉色部分就是 INNER JOIN 的結果。scala
這種類型的 Join 也稱爲笛卡兒積(Cartesian Product),Join 左表的每行數據都會跟右表的每行數據進行 Join,產生的結果行數爲 m*n,因此在生產環境下儘可能不要用這種 Join。下面是 CROSS JOIN 的使用例子:3d
scala> val df = customer.crossJoin(order) df: org.apache.spark.sql.DataFrame = [customerId: int, name: string ... 3 more fields] scala> df.show +----------+--------------+---------+----------+------+ |customerId| name|paymentId|customerId|amount| +----------+--------------+---------+----------+------+ | 101| iteblog| 1| 101| 2500| | 101| iteblog| 2| 102| 1110| | 101| iteblog| 3| 103| 500| | 101| iteblog| 4| 102| 400| | 102|iteblog_hadoop| 1| 101| 2500| | 102|iteblog_hadoop| 2| 102| 1110| | 102|iteblog_hadoop| 3| 103| 500| | 102|iteblog_hadoop| 4| 102| 400| | 103| iteblog001| 1| 101| 2500| | 103| iteblog001| 2| 102| 1110| | 103| iteblog001| 3| 103| 500| | 103| iteblog001| 4| 102| 400| | 104| iteblog002| 1| 101| 2500| | 104| iteblog002| 2| 102| 1110| | 104| iteblog002| 3| 103| 500| | 104| iteblog002| 4| 102| 400| | 105| iteblog003| 1| 101| 2500| | 105| iteblog003| 2| 102| 1110| | 105| iteblog003| 3| 103| 500| | 105| iteblog003| 4| 102| 400| +----------+--------------+---------+----------+------+ only showing top 20 rows
LEFT OUTER JOIN 等價於 LEFT JOIN,這個 Join 的返回的結果相信你們都知道,我就不介紹了。下面三種寫法都是等價的:code
val leftJoinDf = customer.join(order,Seq("customerId"), "left_outer") val leftJoinDf = customer.join(order,Seq("customerId"), "leftouter") val leftJoinDf = customer.join(order,Seq("customerId"), "left") scala> leftJoinDf.show +----------+--------------+---------+------+ |customerId| name|paymentId|amount| +----------+--------------+---------+------+ | 101| iteblog| 1| 2500| | 103| iteblog001| 3| 500| | 102|iteblog_hadoop| 2| 1110| | 102|iteblog_hadoop| 4| 400| | 105| iteblog003| null| null| | 106| iteblog004| null| null| | 104| iteblog002| null| null| +----------+--------------+---------+------+
若是用圖表示的話,LEFT OUTER JOIN 能夠以下所示:能夠看出,參與 Join 的左表數據都會顯示出來,而右表只有關聯上的纔會顯示。
和 LEFT OUTER JOIN 相似,RIGHT OUTER JOIN 等價於 RIGHT JOIN,下面三種寫法也是等價的:
val rightJoinDf = order.join(customer,Seq("customerId"), "right") val rightJoinDf = order.join(customer,Seq("customerId"), "right_outer") val rightJoinDf = order.join(customer,Seq("customerId"), "rightouter") scala> rightJoinDf.show +----------+---------+------+--------------+ |customerId|paymentId|amount| name| +----------+---------+------+--------------+ | 101| 1| 2500| iteblog| | 103| 3| 500| iteblog001| | 102| 2| 1110|iteblog_hadoop| | 102| 4| 400|iteblog_hadoop| | 105| null| null| iteblog003| | 106| null| null| iteblog004| | 104| null| null| iteblog002| +----------+---------+------+--------------+
若是用圖表示的話,RIGHT OUTER JOIN 能夠以下所示:能夠看出,參與 Join 的右表數據都會顯示出來,而左表只有關聯上的纔會顯示。
FULL OUTER JOIN 的含義你們應該也都熟悉,我就不介紹其含義了。FULL OUTER JOIN 有如下四種寫法:
val fullJoinDf = order.join(customer,Seq("customerId"), "outer") val fullJoinDf = order.join(customer,Seq("customerId"), "full") val fullJoinDf = order.join(customer,Seq("customerId"), "full_outer") val fullJoinDf = order.join(customer,Seq("customerId"), "fullouter") scala> fullJoinDf.show +----------+---------+------+--------------+ |customerId|paymentId|amount| name| +----------+---------+------+--------------+ | 101| 1| 2500| iteblog| | 103| 3| 500| iteblog001| | 102| 2| 1110|iteblog_hadoop| | 102| 4| 400|iteblog_hadoop| | 105| null| null| iteblog003| | 106| null| null| iteblog004| | 104| null| null| iteblog002| +----------+---------+------+--------------+
FULL OUTER JOIN 能夠用以下圖表示:
LEFT SEMI JOIN 這個你們應該知道的人相對少些,LEFT SEMI JOIN 只會返回匹配右表的數據,並且 LEFT SEMI JOIN 只會返回左表的數據,右表的數據是不會顯示的,下面三種寫法都是等價的:
val leftSemiJoinDf = order.join(customer,Seq("customerId"), "leftsemi") val leftSemiJoinDf = order.join(customer,Seq("customerId"), "left_semi") val leftSemiJoinDf = order.join(customer,Seq("customerId"), "semi") scala> leftSemiJoinDf.show +----------+---------+------+ |customerId|paymentId|amount| +----------+---------+------+ | 101| 1| 2500| | 103| 3| 500| | 102| 2| 1110| | 102| 4| 400| +----------+---------+------+
從上面結果能夠看出,LEFT SEMI JOIN 其實能夠用 IN/EXISTS 來改寫:
scala> order.registerTempTable("order") warning: there was one deprecation warning (since 2.0.0); for details, enable `:setting -deprecation' or `:replay -deprecation' scala> customer.registerTempTable("customer") warning: there was one deprecation warning (since 2.0.0); for details, enable `:setting -deprecation' or `:replay -deprecation' scala> val r = spark.sql("select * from order where customerId in (select customerId from customer)") r: org.apache.spark.sql.DataFrame = [paymentId: int, customerId: int ... 1 more field] scala> r.show +---------+----------+------+ |paymentId|customerId|amount| +---------+----------+------+ | 1| 101| 2500| | 3| 103| 500| | 2| 102| 1110| | 4| 102| 400| +---------+----------+------+
LEFT SEMI JOIN 能夠用下圖表示:
與 LEFT SEMI JOIN 相反,LEFT ANTI JOIN 只會返回沒有匹配到右表的左表數據。並且下面三種寫法也是等效的:
val leftAntiJoinDf = customer.join(order,Seq("customerId"), "leftanti") val leftAntiJoinDf = customer.join(order,Seq("customerId"), "left_anti") val leftAntiJoinDf = customer.join(order,Seq("customerId"), "anti") scala> leftAntiJoinDf.show +----------+----------+ |customerId| name| +----------+----------+ | 105|iteblog003| | 106|iteblog004| | 104|iteblog002| +----------+----------+
同理,LEFT ANTI JOIN 也能夠用 NOT IN 來改寫:
scala> val r = spark.sql("select * from customer where customerId not in (select customerId from order)") r: org.apache.spark.sql.DataFrame = [customerId: int, name: string] scala> r.show +----------+----------+ |customerId| name| +----------+----------+ | 104|iteblog002| | 105|iteblog003| | 106|iteblog004| +----------+----------+
LEFT SEMI ANTI 能夠用下圖表示:
好了,Spark 七種 Join 類型已經簡單介紹完了,你們能夠根據不一樣類型的業務場景選擇不一樣的 Join 類型。今天分享就到這,感謝你們關注支持。