是時候學習真正的 spark 技術了

spark sql 能夠說是 spark 中的精華部分了,我感受總體複雜度是 spark streaming 的 5 倍以上,如今 spark 官方主推 structed streaming, spark streaming  維護的也不積極了, 咱們基於 spark 來構建大數據計算任務,重心也要向 DataSet 轉移,原來基於 RDD 寫的代碼遷移過來,好處是很是大的,尤爲是在性能方面,有質的提高,  spark sql 中的各類內嵌的性能優化是比人裸寫 RDD 遵照各類所謂的最佳實踐更靠譜的,尤爲對新手來說, 好比有些最佳實踐講到先 filter 操做再 map 操做,這種 spark sql 中會自動進行謂詞下推,好比儘可能避免使用 shuffle 操做,spark sql 中若是你開啓了相關的配置,會自動使用 broadcast join 來廣播小表,把 shuffle join 轉化爲 map join 等等,真的能讓咱們省不少心。算法

spark sql 的代碼複雜度是問題的本質複雜度帶來的,spark sql 中的 Catalyst 框架大部分邏輯是在一個 Tree 類型的數據結構上作各類折騰,基於 scala 來實現仍是很優雅的,scala 的偏函數和強大的 Case 正則匹配,讓整個代碼看起來仍是清晰的, 這篇文章簡單的描述下 spark sql 中的一些機制和概念。sql

SparkSession 是咱們編寫 spark 應用代碼的入口,啓動一個 spark-shell 會提供給你一個建立 SparkSession, 這個對象是整個 spark 應用的起始點,咱們來看下 sparkSession 的一些重要的變量和方法:shell

數據庫

上面提到的 sessionState 是一個很關鍵的東西,維護了當前 session 使用的全部的狀態數據,有如下各類須要維護的東西:性能優化

spark sql 內部使用 dataFrame 和 Dataset 來表示一個數據集合,而後你能夠在這個數據集合上應用各類統計函數和算子,有人可能對  DataFrame 和 Dataset 分不太清,其實 DataFrame 就是一種類型爲 Row 的 DataSet,網絡

type DataFrame = Dataset[Row]session

這裏說的 Row 類型在 Spark sql 對外暴露的 API 層面來講的, 然而 DataSet 並不要求輸入類型爲 Row,也能夠是一種強類型的數據,DataSet 底層處理的數據類型爲 Catalyst 內部 InternalRow 或者 UnsafeRow 類型, 背後有一個 Encoder 進行隱式轉換,把你輸入的數據轉換爲內部的 InternalRow,那麼這樣推論,DataFrame 就對應 RowEncoder。數據結構

在 Dataset 上進行 transformations 操做就會生成一個元素爲 LogicalPlan 類型的樹形結構, 咱們來舉個例子,假如我有一張學生表,一張分數表,需求是統計全部大於 11 歲的學生的總分。框架

ide

這個 queryExecution 就是整個執行計劃的執行引擎, 裏面有執行過程當中,各個中間過程變量,整個執行流程以下

那麼咱們上面例子中的 sql 語句通過 Parser 解析後就會變成一個抽象語法樹,對應解析後的邏輯計劃 AST 爲

形象一點用圖來表示

咱們能夠看到過濾條件變爲了 Filter 節點,這個節點是 UnaryNode 類型, 也就是隻有一個孩子,兩個表中的數據變爲了 UnresolvedRelation 節點,這個節點是 LeafNode 類型, 顧名思義,葉子節點, JOIN 操做就表位了 Join 節點, 這個是一個 BinaryNode 節點,有兩個孩子。

上面說的這些節點都是 LogicalPlan 類型的, 能夠理解爲進行各類操做的 Operator, spark sql 對應各類操做定義了各類 Operator。

這些 operator 組成的抽象語法樹就是整個 Catatyst 優化的基礎,Catatyst 優化器會在這個樹上面進行各類折騰,把樹上面的節點挪來挪去來進行優化。

如今通過 Parser 有了抽象語法樹,可是並不知道 score,sum 這些東西是啥,因此就須要 analyer 來定位, analyzer 會把 AST 上全部 Unresolved 的東西都轉變爲 resolved 狀態,sparksql 有不少resolve 規則,都很好理解,例如 ResolverRelations 就是解析表(列)的基本類型等信息,ResolveFuncions 就是解析出來函數的基本信息,好比例子中的sum 函數,ResolveReferences 可能不太好理解,咱們在 sql 語句中使用的字段好比 Select name 中的 name 對應一個變量, 這個變量在解析表的時候就做爲一個變量(Attribute 類型)存在了,那麼 Select 對應的 Project 節點中對應的相同的變量就變成了一個引用,他們有相同的 ID,因此通過 ResolveReferences 處理後,就變成了 AttributeReference 類型   ,保證在最後真正加載數據的時候他們被賦予相同的值,就跟咱們寫代碼的時候定義一個變量同樣,這些 Rule 就反覆做用在節點上,指定樹節點趨於穩定,固然優化的次數多了會浪費性能,因此有的 rule  做用 Once, 有的 rule 做用 FixedPoint, 這都是要取捨的。好了, 不說廢話,咱們作個小實驗。

咱們使用 ResolverRelations 對咱們的 AST 進行解析,解析後能夠看到原來的 UnresolvedRelation 變成了 LocalRelation,這個表示一個本地內存中的表,這個表是咱們使用 createOrReplaceTempView 的時候註冊在 catalog 中的,這個 relove 操做無非就是在 catalog 中查表,找出這個表的 schema, 並且解析出來相應的字段,把外層用戶定義的 各個 StructField 轉變爲 AttibuteReference,使用 ID 進行了標記。

咱們再使用 ResolveReferences 來搞一下,你會發現上層節點中的相同的字段都變成了擁有相同 ID 的引用,他們的類型都是 AttibuteReference。最終全部的 rule 都應用後,整個 AST 就變爲了

下面重點來了,要進行邏輯優化了,咱們看下邏輯優化有哪些:

sparksql 中的邏輯優化種類繁多,spark sql 中的 Catalyst 框架大部分邏輯是在一個 Tree 類型的數據結構上作各類折騰,基於 scala 來實現仍是很優雅的,scala 的偏函數 和 強大的 Case 正則匹配,讓整個代碼看起來仍是清晰的,廢話少說,咱們來搞個小實驗。

看到了沒,把個人 (100 + 10) 換成了 110。

使用 PushPredicateThroughJoin 把一個單單對 stu 表作過濾的 Filter 給下推到 Join 以前了,會少加載不少數據,性能獲得了優化,咱們來看下最終的樣子。

至少用了 ColumnPruning,PushPredicateThroughJoin,ConstantFolding,RemoveRedundantAliases 邏輯優化手段,如今個人小樹變成了:

作完邏輯優化,畢竟只是抽象的邏輯層,還須要先轉換爲物理執行計劃,將邏輯上可行的執行計劃變爲 Spark 能夠真正執行的計劃。

spark sql 把邏輯節點轉換爲了相應的物理節點, 好比 Join 算子,Spark 根據不一樣場景爲該算子制定了不一樣的算法策略,有BroadcastHashJoin、ShuffleHashJoin 以及 SortMergeJoin 等, 固然這裏面有不少優化的點,spark 在轉換的時候會根據一些統計數據來智能選擇,這就涉及到基於代價的優化,這也是很大的一塊,後面能夠開一篇文章單講, 咱們例子中的因爲數據量小於 10M, 自動就轉爲了 BroadcastHashJoin,眼尖的同窗能夠看到好像多了一些節點,咱們來解釋下, BroadcastExchange 節點繼承 Exchage 類,用來在節點間交換數據,這裏的BroadcastExchange 就是會把 LocalTableScan出來的數據 broadcast 到每一個 executor 節點,用來作 map-side join。最後的 Aggregate 操做被分爲了兩步,第一步先進行並行聚合,而後對聚合後的結果,再進行 Final 聚合,這個就相似域名 map-reduce  裏面的 combine 和最後的 reduce, 中間加上了一個 Exchange hashpartitioning, 這個是爲了保證相同的 key shuffle 到相同的分區,當前物理計劃的 Child 輸出數據的 Distribution 達不到要求的時候須要進行Shuffle,這個是在最後的 EnsureRequirement 階段插入的交換數據節點,在數據庫領域裏面,有那麼一句話,叫得 join 者得天下,咱們重點講一些 spark sql 在 join 操做的時候作的一些取捨。

Join 操做基本上能上會把兩張 Join 的表分爲大表和小表,大表做爲流式遍歷表,小表做爲查找表,而後對大表中的每一條記錄,根據 Key 來取查找表中取相同 Key 的記錄。

spark 支持全部類型的 Join:

spark sql 中 join 操做根據各類條件選擇不一樣的 join 策略,分爲 BroadcastHashJoin, SortMergeJoin, ShuffleHashJoin。

  • BroadcastHashJoin:spark 若是判斷一張表存儲空間小於 broadcast 閾值時(Spark 中使用參數 spark.sql.autoBroadcastJoinThreshold 來控制選擇 BroadcastHashJoin 的閾值,默認是 10MB),就是把小表廣播到 Executor, 而後把小表放在一個 hash 表中做爲查找表,經過一個 map 操做就能夠完成 join 操做了,避免了性能代碼比較大的 shuffle 操做,不過要注意, BroadcastHashJoin 不支持 full outer join, 對於 right outer join, broadcast 左表,對於 left outer join,left semi join,left anti join ,broadcast 右表, 對於 inner join,那個表小就 broadcast 哪一個。

  • SortMergeJoin:若是兩個表的數據都很大,比較適合使用 SortMergeJoin,  SortMergeJoin 使用shuffle 操做把相同 key 的記錄 shuffle 到一個分區裏面,而後兩張表都是已經排過序的,進行 sort merge 操做,代價也能夠接受。

  • ShuffleHashJoin:就是在 shuffle 過程當中不排序了,把查找表放在hash表中來進行查找 join,那何時會進行 ShuffleHashJoin 呢?查找表的大小須要超過 spark.sql.autoBroadcastJoinThreshold 值,否則就使用  BroadcastHashJoin 了,每一個分區的平均大小不能超過   spark.sql.autoBroadcastJoinThreshold ,這樣保證查找表能夠放在內存中不 OOM, 還有一個條件是 大表是小表的 3 倍以上,這樣才能發揮這種 Join 的好處。

上面提到 AST 上面的節點已經轉換爲了物理節點,這些物理節點最終從頭節點遞歸調用 execute 方法,裏面會在 child 生成的 RDD 上調用 transform操做就會產生一個串起來的 RDD 鏈, 就跟在 spark stremaing 裏面在 DStream 上面遞歸調用那樣。最後執行出來的圖以下:

能夠看到這個最終執行的時候分分紅了兩個 stage, 把小表 broeadcastExechage 到了大表上作 BroadcastHashJoin, 沒有進化 shuffle 操做,而後最後一步聚合的時候,先在 map 段進行了一次 HashAggregate sum 函數, 而後 Exchage 操做根據 name 把相同 key 的數據 shuffle 到同一個分區,而後作最終的 HashAggregate sum 操做,這裏有個 WholeStageCodegen 比較奇怪,這個是幹啥的呢,由於咱們在執行 Filter ,Project 這些 operator 的時候,這些 operator 內部包含不少  Expression, 好比 SELECT sum(v),name, 這裏的 sum 和 v 都是 Expression,這裏面的 v 屬於 Attribute 變量表達式,表達式也是樹形數據結構,sum(v)  就是 sum 節點和 sum 的子節點 v 組成的一個樹形結構,這些表達式都是能夠求值和生成代碼的,表達式最基本的功能就是求值,對輸入的 Row 進行計算 , Expression 須要實現 def eval(input: InternalRow = null): Any 函數來實現它的功能。

表達式是對 Row 進行加工,輸出的能夠是任意類型,可是 Project 和 Filter 這些 Plan 輸出的類型是 def output: Seq[Attribute], 這個就是表明一組變量,好比咱們例子中的 Filter (age >= 11) 這個plan, 裏面的 age>11 就是一個表達式,這個 > 表達式依賴兩個子節點, 一個Literal常量表達式求值出來就是 11, 另一個是 Attribute 變量表達式 age, 這個變量在 analyze 階段轉變爲了 AttributeReference 類型,可是它是Unevaluable,爲了獲取屬性在輸入 Row 中對應的值, 還得根據 schema 關聯綁定一下這個變量在一行數據的 index, 生成 BoundReference,而後 BoundReference 這種表達式在 eval 的時候就能夠根據 index 來獲取 Row 中的值。  age>11 這個表達式最終輸出類型爲 boolean 類型,可是 Filter 這個 Plan 輸出類型是 Seq[Attribute] 類型。

能夠想象到,數據在一個一個的 plan 中流轉,而後每一個 plan 裏面表達式都會對數據進行處理,就至關於通過了一個個小函數的調用處理,這裏面就有大量的函數調用開銷,那麼咱們是否是能夠把這些小函數內聯一下,當成一個大函數,WholeStageCodegen 就是幹這事的。

能夠看到最終執行計劃每一個節點前面有個 * 號,說明整段代碼生成被啓用,在咱們的例子中,Filter, Project,BroadcastHashJoin,Project,HashAggregate 這一段都啓用了整段代碼生成,級聯爲了兩個大函數,有興趣可使用 a.queryExecution.debug.codegen 看下生成後的代碼長什麼樣子。然而 Exchange 算子並無實現整段代碼生成,由於它須要經過網絡發送數據。

我今天的分享就到這裏,其實 spark sql 裏面有不少有意思的東西,可是由於問題的本質複雜度,致使須要高度抽象才能把這一切理順,這樣就給代碼閱讀者帶來了理解困難, 可是你若是真正看進去了,就會有不少收穫。若是對本文有任何看法,歡迎在文末留言說出你的想法。

牛人說

「牛人說」專欄致力於技術人思想的發現,其中包括技術實踐、技術乾貨、技術看法、成長心得,還有一切值得被發現的內容。咱們但願集合最優秀的技術人,挖掘獨到、犀利、具備時代感的聲音。

投稿郵箱:marketing@qiniu.com

 

相關文章
相關標籤/搜索