深刻研究Spark SQL的Catalyst優化器(原創翻譯)

Spark SQL是Spark最新和技術最爲複雜的組件之一。它支持SQL查詢和新的DataFrame API。Spark SQL的核心是Catalyst優化器,它以一種新穎的方式利用高級編程語言特性(例如Scala的 模式匹配quasiquotes)來構建可擴展查詢優化器。
咱們最近發佈了一篇關於Spark SQL的 論文,該論文將出如今SIGMOD 2015(由Davies Liu,Joseph K. Bradley,Xiangrui Meng,Tomer Kaftan,Michael J. Franklin和Ali Ghodsi合著)中。在這篇博客文章中,咱們重述該論文的部份內容,解釋Catalyst優化器的內部功能以實現更普遍的應用。
爲了實現Spark SQL,咱們設計了一個新的可擴展優化器Catalyst,它基於Scala中的函數式編程結構。 Catalyst的可擴展設計有兩個目的。首先,咱們但願可以輕鬆地爲Spark SQL添加新的優化技術和功能,尤爲是爲了解決咱們在使用大數據時遇到的各類問題(例如,半結構化數據和高級分析)。其次,咱們但願使外部開發人員可以擴展優化器 - 例如,經過添加數據源特定規則,能夠將過濾或聚合的數據推送到外部存儲系統,或者支持新的數據類型。 Catalyst支持基於規則和基於成本的優化。
Catalyst的核心是使用一個通用庫生成樹並使用規則操做這些樹。在該框架的基礎上,構建了用於關係查詢處理庫(例如表達式,邏輯查詢計劃)和處理執行查詢不一樣階段的幾組規則:分析、邏輯優化、物理計劃和代碼生成,代碼生成將部分查詢編譯爲Java字節碼。對於後者,使用了Scala特性quasiquotes,它能夠很容易地在運行時由組合表達式生成代碼。最後,Catalyst提供了幾個公共擴展點,包括外部數據源和用戶定義的類型。

Catalyst中的主要數據類型是由節點對象組成的樹。 每一個節點都有一個節點類型和零個或多個子節點。 新的節點類型在Scala中定義爲TreeNode類的子類。 這些對象是不可變的,並可使用函數轉換來操做,以下一小節所討論的。
一個簡單的例子,使用很是簡單的表達式語言描述三個節點類:
  • Literal(值:Int):常數值
  • Attribute(名稱:String):輸入行的屬性,例如「x」
  • Add(左:TreeNode,右:TreeNode):兩個表達式的總和。
這些類能夠用來構建樹; 例如,表達式x +(1 + 2)的樹將在Scala代碼中表示以下:
1 Add(Attribute(x), Add(Literal(1), Literal(2)))

規則html

可使用規則來操做樹,這些規則是從一棵樹到另外一棵樹的函數。雖然規則能夠在其輸入樹上運行任意代碼(由於該樹只是一個Scala對象),但最多見的方法是使用一組模式匹配函數來查找和替換具備特定結構的子樹。
模式匹配是許多函數式語言的一個特性,它容許從代數數據類型的潛在嵌套結構中提取值。在Catalyst中,樹提供了一種轉換方法,該方法遞歸地在樹的全部節點上應用模式匹配函數,將每一個模式匹配轉換爲結果。例如,咱們能夠實現一個在常量之間相加???Add操做的規則,以下所示:
1 tree.transform {
2   case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
3 }

將此應用於x +(1 + 2)的樹會產生新的樹x + 3。這裏關鍵是使用了Scala的標準模式匹配語法,它可用於匹配對象的類型和爲提取的值(這裏爲c1和c2)提供名稱。node

傳遞給變換的模式匹配表達式是一個部分函數,​​這意味着它只須要匹配全部輸入樹的子集。 Catalyst將測試規則適用樹的哪些部分,自動跳過並降低到不匹配的子樹。這種能力意味着規則只需對給定適用優化的樹進行推理,而對那些不適用的數不進行推理。所以,當新的操做符新增到系統中時,這些規則不須要修改。
規則(和通常的Scala模式匹配)能夠在同一個變換調用中匹配多個模式,這使得一次實現多個轉換來得很是簡潔。
1 tree.transform {
2   case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
3   case Add(left, Literal(0)) => left
4   case Add(Literal(0), right) => right
5 }

實際上,規則可能須要屢次執行才能徹底轉換樹。Catalyst將規則造成批處理,並執行每一個批處理至固定點,該固定點是樹應用其規則後不發生改變。雖然規則運行到固定點意味着每一個規則是簡單且自包含,但這些規則仍會對樹上產生較大的全局效果。在上面的例子中,重複的應用規則會持續摺疊較大的樹,好比(x + 0)+(3 + 3)。另外一個例子,第一個批處理能夠分析全部屬性指定類型的表達式,而第二批處理可以使用這些類型來進行常量摺疊。在每批處理完畢後,開發人員還能夠對新樹進行規範性檢查(例如,查看全部屬性爲指定類型),這些檢查通常使用遞歸匹配來編寫。git

最後,規則條件及其自己能夠包含任意的Scala代碼。這使得Catalyst比領域特定語言在優化器上更強大,同時保持簡潔特性。
根據經驗,對不可變樹的函數轉換使得整個優化器很是易於推理和調試。規則也支持在優化器中並行化,儘管該特性尚未利用這個。

在Spark SQL中使用Catalyst

Catalyst的通用樹轉換框架分爲四個階段,以下所示:(1)分析解決引用的邏輯計劃,(2)邏輯計劃優化,(3)物理計劃,(4)代碼生成用於編譯部分查詢生成Java字節碼。 在物理規劃階段,Catalyst可能會生成多個計劃並根據成本進行比較。 全部其餘階段徹底是基於規則的。 每一個階段使用不一樣類型的樹節點; Catalyst包括用於表達式、數據類型以及邏輯和物理運算符的節點庫。 這些階段以下所示:

解析

sales」中列的類型,甚至列名是否有效,在查詢表sale元數據以前這些都是未知的。若是不知道它的類型或沒有將它匹配到輸入表(或別名)時,那麼該屬性稱爲未解析。Spark SQL使用Catalyst規則和記錄全部表元數據的Catalog對象來解析這些屬性的。構建具備未綁定屬性和數據類型的「未解析的邏輯計劃」樹後,而後執行如下規則:
一、從Catalog中查找名稱關係
二、將命名屬性(如col)映射到操做符的子項
三、將那些屬性引用相同的值給它們一個惟一的ID(隨後遇到如col=col時能夠進行優化)
四、經過表達式傳遞和強制類型:例如,咱們沒法知道1+col的返回類型,直到解析出col並將其子表達式轉換爲兼容類型。
通過統計,解析器的規則大約有 1000行代碼

邏輯計劃優化

在邏輯優化階段,邏輯計劃應用了標準的基於規則的優化。(基於成本的優化經過規則生成多個計劃,而後計算其成原本執行。)這些優化包括常量摺疊、謂詞下推、項目裁剪、空值傳播、布爾表達式簡化以及其餘規則。總的來講,爲各類狀況添加規則很是簡單。例如,當咱們將固定精度的DECIMAL類型添加到Spark SQL時,咱們想要以較低精度的方式優化DECIMAL的聚合(例如求和和平均值);只須要12行代碼編寫一個規則即可在SUM和AVG表達式中找到該數,而後將它們轉換爲未縮放的64位LONG,而後進行聚合,最後將結果轉換回來。這個規則的簡化版本,只能優化SUM表達式以下所示:
1 object DecimalAggregates extends Rule[LogicalPlan] {
2   /** Maximum number of decimal digits in a Long */
3   val MAX_LONG_DIGITS = 18
4   def apply(plan: LogicalPlan): LogicalPlan = {
5     plan transformAllExpressions {
6       case Sum(e @ DecimalType.Expression(prec, scale))
7           if prec + 10 <= MAX_LONG_DIGITS =>
8         MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) }
9 }

再舉一個例子,一個12行代碼的規則經過簡單的正則表達式將LIKE表達式優化爲String.startsWith或String.contains調用。在規則中使用任意Scala代碼使得這些優化易於表達,而這些規則超越了子樹結構的模式匹配。github

通過統計,邏輯優化規則有 800行代碼

物理計劃

在物理計劃階段,Spark SQL使用邏輯計劃生成一個或多個物理計劃,這個過程採用了匹配Spark執行引擎的物理運算符。而後使用成本模型選擇計劃。目前,基於成本的優化僅用於選擇鏈接算法:對於已知很小的關係,Spark SQL使用Spark中的點對點廣播工具進行廣播鏈接。不過,該框架支持更深刻地使用基於成本的優化,由於可使用規則對整棵樹進行遞歸估計。所以,咱們打算在將來實施更加豐富的基於成本的優化。
物理計劃還執行基於規則的物理優化,例如將管道項目或過濾器合併到一個Spark映射操做中。另外,它能夠將操做從邏輯計劃推送到支持謂詞或項目下推的數據源。咱們將在後面的章節中描述這些數據源的API。
總的來講,物理計劃規則大約有 500行代碼

代碼生成

查詢優化的最後階段涉及生成Java字節碼用於在每臺機器上運行。因爲Spark SQL常常在內存數據集上運行,其中處理受CPU限制,咱們但願Spark SQL支持代碼生成以加快執行速度。儘管如此,代碼生成引擎的構建一般很複雜,特別是編譯器。Catalyst依靠Scala語言的特殊功能quasiquotes來簡化代碼生成。 Quasiquotes容許在Scala語言中對抽象語法樹(AST)進行編程式構建,而後在運行時將其提供給Scala編譯器以生成字節碼。使用Catalyst將表示SQL表達式的樹轉換爲Scala代碼的AST用於描述表達式,而後編譯並運行生成的代碼。
做爲一個簡單的例子,參考第4.2節介紹的Add、Attribute和Literal樹節點能夠寫成(x + y)+1表達式。若是沒有使用代碼生成,這些表達式必須遍歷Add、Attribute和Literal節點樹行走才能解釋每行數據。這會引入大量的分支和虛函數調用,從而減慢執行速度。若是使用了代碼生成,能夠編寫一個函數將特定的表達式樹轉換爲Scala AST,以下所示:
1 def compile(node: Node): AST = node match {
2   case Literal(value) => q"$value"
3   case Attribute(name) => q"row.get($name)"
4   case Add(left, right) => q"${compile(left)} + ${compile(right)}"
5 }

以q開頭的字符串是quasiquotes,雖然它們看起來像字符串,但它們在編譯時由Scala編譯器解析,並表明其代碼的AST。 Quasiquotes用$符號表示法將變量或其餘AST拼接到它們中。例如,文字(1)將成爲1的Scala表達式的AST,而屬性(「x」)變爲row.get(「x」)。最後,相似Add(Literal(1),Attribute(「x」))的樹成爲像1 + row.get(「x」)這樣的Scala表達式的AST。正則表達式

Quasiquotes在編譯時進行類型檢查,以確保只替換合適的AST或文字,使得它們比字符串鏈接更有用,而且直接生成Scala AST,而非在運行時運行Scala語法分析器。此外,它們是高度可組合的,由於每一個節點的代碼生成規則不須要知道其子節點返回的樹是如何構建的。最後,若是Catalyst缺乏表達式級別的優化,則由Scala編譯器對結果代碼進行進一步優化。下圖顯示quasiquotes生成代碼其性能相似於手動優化的程序。
咱們發現quasiquotes很是接近於代碼生成,而且發現即便是Spark SQL的新貢獻者也能夠快速爲新類型的表達式添加規則。 Quasiquotes也適用於在本地Java對象上運行的目標:當從這些對象訪問字段時,能夠直接訪問所需字段,而沒必要將對象複製成Spark SQL 行,並使用行訪問器方法。最後,將代碼生成的評估與對還沒有生成代碼的表達式的解釋評估結合起來很簡單,由於編譯的Scala代碼能夠直接使用到表達式解釋器中。
Catalyst生成器總共有大約 700行代碼
這篇博客文章介紹了Spark SQL的Catalyst優化器內部原理。 經過這種新穎、簡單的設計使Spark社區可以快速創建原型、實現和擴展引擎。 你能夠在這裏 閱讀其他的論文。 
您還能夠從如下內容中找到有關Spark SQL的更多信息:
 
 
相關文章
相關標籤/搜索