【大數據】SparkSql 鏈接查詢中的謂詞下推處理 (一)

本文首發於 vivo互聯網技術 微信公衆號 https://mp.weixin.qq.com/s/YPN85WBNcnhk8xKjTPTa2gsql

做者:李勇數據庫

目錄:微信

1.SparkSql網絡

2.鏈接查詢和鏈接條件架構

3.謂詞下推框架

4.內鏈接查詢中的謂詞下推規則分佈式

4.1.Join後條件經過AND鏈接函數

4.2.Join後條件經過OR鏈接oop

4.3.分區表使用OR鏈接過濾條件優化

1.SparkSql

SparkSql 是架構在 Spark 計算框架之上的分佈式 Sql 引擎,使用 DataFrame 和 DataSet 承載結構化和半結構化數據來實現數據複雜查詢處理,提供的 DSL能夠直接使用 scala 語言完成 Sql 查詢,同時也使用  thriftserver 提供服務化的 Sql 查詢功能。

SparkSql 提供了 DataSource API ,用戶經過這套 API 能夠本身開發一套 Connector,直接查詢各種數據源,數據源包括 NoSql、RDBMS、搜索引擎以及 HDFS 等分佈式文件系統上的文件等。和 SparkSql 相似的系統有 Hive、PrestoDB 以及 Impala,這類系統都屬於所謂的" Sql on Hadoop "系統,每一個都至關火爆,畢竟在這個不搞 SQL 就是耍流氓的年代,沒 SQL 確實很難找到用戶使用。

2.鏈接查詢和鏈接條件

Sql中的鏈接查詢(join),主要分爲內鏈接查詢(inner join)、外鏈接查詢(outter join)和半鏈接查詢(semi join),具體的區別能夠參考wiki的解釋。

鏈接條件(join condition),則是指當這個條件知足時兩表的兩行數據才能"join"在一塊兒被返回,例若有以下查詢:

其中的"LT.id=RT.idAND LT.id>1"這部分條件被稱爲"join中條件",直接用來判斷被join的兩表的兩行記錄可否被join在一塊兒,若是不知足這個條件,兩表的這兩行記錄並不是所有被踢出局,而是根據鏈接查詢類型的不一樣有不一樣的處理,因此這並不是一個單表的過濾過程或者兩個表的的「聯合過濾」過程;而where後的"RT.id>2"這部分被稱爲"join後條件",這裏雖然成爲"join後條件",可是並不是必定要在join後才能去過濾數據,只是說明若是在join後進行過濾,確定能夠獲得一個正確的結果,這也是咱們後邊分析問題時獲得正確結果的基準方法。

3.謂詞下推

所謂謂詞(predicate),英文定義是這樣的:A predicate is a function that returns bool (or something that can be implicitly converted to bool),也就是返回值是true或者false的函數,使用過scala或者spark的同窗都知道有個filter方法,這個高階函數傳入的參數就是一個返回true或者false的函數。

可是若是是在sql語言中,沒有方法,只有表達式。where後邊的表達式起的做用正是過濾的做用,而這部分語句被sql層解析處理後,在數據庫內部正是以謂詞的形式呈現的。

那麼問題來了,謂詞爲何要下推呢? SparkSql中的謂詞下推有兩層含義,第一層含義是指由誰來完成數據過濾,第二層含義是指什麼時候完成數據過濾。要解答這兩個問題咱們須要瞭解SparkSql的Sql語句處理邏輯,大體能夠把SparkSql中的查詢處理流程作以下的劃分:

SparkSql首先會對輸入的Sql語句進行一系列的分析(Analyse),包括詞法解析(能夠理解爲搜索引擎中的分詞這個過程)、語法分析以及語義分析(例如判斷database或者table是否存在、group by必須和聚合函數結合等規則);以後是執行計劃的生成,包括邏輯計劃和物理計劃。其中在邏輯計劃階段會有不少的優化,對謂詞的處理就在這個階段完成;而物理計劃則是RDD的DAG圖的生成過程;這兩步完成以後則是具體的執行了(也就是各類重量級的計算邏輯,例如join、groupby、filter以及distinct等),這就會有各類物理操做符(RDD的Transformation)的亂入。

可以完成數據過濾的主體有兩個,第一是分佈式Sql層(在execute階段),第二個是數據源。那麼謂詞下推的第一層含義就是指由Sql層的Filter操做符來完成過濾,仍是由Scan操做符在掃描階段完成過濾。

上邊提到,咱們能夠經過封裝SparkSql的Data Source API完成各種數據源的查詢,那麼若是底層數據源沒法高效完成數據的過濾,就會執行全局掃描,把每條相關的數據都交給SparkSql的Filter操做符完成過濾,雖然SparkSql使用的Code Generation技術極大的提升了數據過濾的效率,可是這個過程沒法避免大量數據的磁盤讀取,甚至在某些狀況下會涉及網絡IO(例如數據非本地化存儲時);若是底層數據源在進行掃描時能很是快速的完成數據的過濾,那麼就會把過濾交給底層數據源來完成(至於哪些數據源能高效完成數據的過濾以及SparkSql又是如何完成高效數據過濾的則不是本文討論的重點,會在其餘系列的文章中介紹)。

那麼謂詞下推第二層含義,即什麼時候完成數據過濾則通常是在指鏈接查詢中,是先對單表數據進行過濾再和其餘錶鏈接仍是在先把多表進行鏈接再對鏈接後的臨時表進行過濾,則是本系列文章要分析和討論的重點。

4.內鏈接查詢中的謂詞下推規則

假設咱們有兩張表,表結構很簡單,數據也都只有兩條,可是足以講清楚咱們的下推規則,兩表以下,一個lefttable,一個righttable:

4.1.Join後條件經過AND鏈接

先來看一條查詢語句:

這個查詢是一個內鏈接查詢,join後條件是用and鏈接的兩個表的過濾條件,假設咱們不下推,而是先作內鏈接判斷,這時是能夠獲得正確結果的,步驟以下:

  1. 左表id爲1的行在右表中能夠找到,即這兩行數據能夠"join"在一塊兒

  2. 左表id爲2的行在右表中能夠找到,這兩行也能夠"join"在一塊兒

至此,join的臨時結果表(之因此是臨時表,由於尚未進行過濾)以下:

而後使用where條件進行過濾,顯然臨時表中的第一行不知足條件,被過濾掉,最後結果以下:

來看看先進行謂詞下推的狀況。先對兩表進行過濾,過濾的結果分別以下:

而後再對這兩個過濾後的表進行內鏈接處理,結果以下:

可見,這和先進行join再過濾獲得的結果一致。

4.2.Join後條件經過OR鏈接

再來看一條查詢語句:

咱們先進行join處理,臨時表的結果以下:

而後使用where條件進行過濾,最終查詢結果以下:

若是咱們先使用where條件後每一個表各自的過濾條件進行過濾,那麼兩表的過濾結果以下:

而後對這兩個臨時表進行內鏈接處理,結果以下:

表格有問題吧,只有字段名,沒有字段值,怎麼回事?是的,你沒看錯,確實沒有值,由於左表過濾結果只有id爲1的行,右表過濾結果只有id爲2的行,這兩行是不能內鏈接上的,因此沒有結果。

那麼爲何where條件中兩表的條件被or鏈接就會出現錯誤的查詢結果呢?分析緣由主要是由於,對於or兩側的過濾條件,任何一個知足條件便可以返回TRUE,那麼對於"LT.value = 'two' OR RT.value = 'two' "這個查詢條件,若是使用LT.value='two'把只有LT.value爲'two'的左表記錄過濾出來,那麼對於左表中LT.value不爲two的行,他們可能在跟右表使用id字段鏈接上以後,右表的RT.value剛好爲two,也知足"LT.value = 'two' OR RT.value = 'two' ",可是惋惜呀惋惜,這行記錄由於以前的粗暴處理,已經被過濾掉,結果就是獲得了錯誤的查詢結果。因此這種狀況下謂詞是不能下推的。

可是OR鏈接兩表join後條件也有兩個例外,這裏順便分析第一個例外。第一個例外是過濾條件字段剛好爲Join字段,好比以下的查詢:

在這個查詢中,join後條件依然是使用OR鏈接兩表的過濾條件,不一樣的是,join中條件再也不是id相等,而是value字段相等,也就是說過濾條件字段剛好就是join條件字段。你們能夠自行採用上邊的分步法分析謂詞下推和不下推時的查詢結果,獲得的結果是相同的。

咱們來看看上邊不能下推時出現的狀況在這種查詢裏會不會出現。對於左表,若是使用LT.value='two'過濾掉不符合條件的其餘行,那麼由於join條件字段也是value字段,說明在左表中LT.value不等於two的行,在右表中也不能等於two,不然就不知足"LT.value=RT.value"了。這裏其實有一個條件傳遞的過程,經過join中條件,已經在邏輯上提早把兩表整合成了一張表。

至於第二個例外,則涉及了SparkSql中的一個優化,因此須要單獨介紹。

4.3.分區表使用OR鏈接過濾條件

若是兩個表都是分區表,會出現什麼狀況呢?咱們先來看以下的查詢:

此時左表和右表都再也不是普通的表,而是分區表,分區字段是pt,按照日期進行數據分區。同時兩表查詢條件依然使用OR進行鏈接。試想,若是不能提早對兩表進行過濾,那麼會有很是巨量的數據要首先進行鏈接處理,這個代價是很是大的。可是若是按照咱們在2中的分析,使用OR鏈接兩表的過濾條件,又不能隨意的進行謂詞下推,那要如何處理呢?SparkSql在這裏使用了一種叫作「分區裁剪」的優化手段,即把分區並不看作普通的過濾條件,而是使用了「一刀切」的方法,把不符合查詢分區條件的目錄直接排除在待掃描的目錄以外。

咱們知道分區表在HDFS上是按照目錄來存儲一個分區的數據的,那麼在進行分區裁剪時,直接把要掃描的HDFS目錄通知Spark的Scan操做符,這樣,Spark在進行掃描時,就能夠直接咔嚓掉其餘的分區數據了。可是,要完成這種優化,須要SparkSql的語義分析邏輯可以正確的分析出Sql語句所要表達的精確目的,因此分區字段在SparkSql的元數據中也是獨立於其餘普通字段,進行了單獨的標示,就是爲了方便語義分析邏輯能區別處理Sql語句中where條件裏的這種特殊狀況。

更多內容敬請關注 vivo 互聯網技術 微信公衆號

 

注:轉載文章請先與微信號:labs2020 聯繫。

相關文章
相關標籤/搜索