sparkSQL1.1

http://blog.csdn.net/bluejoe2000/article/details/41247857html

2014年9月11日,Spark1.1.0突然之間發佈。筆者當即下載、編譯、部署了Spark1.1.0。關於Spark1.1的編譯和部署,請參看筆者博客 Spark1.1.0 源碼編譯和部署包生成 。
      Spark1.1.0中變化較大是sparkSQL和MLlib,sparkSQL1.1.0主要的變更有:
  • 增長了JDBC/ODBC Server(ThriftServer),用戶能夠在應用程序中鏈接到SparkSQL並使用其中的表和緩存表。
  • 增長了對JSON文件的支持
  • 增長了對parquet文件的本地優化
  • 增長了支持將python、scala、java的lambda函數註冊成UDF,並能在SQL中直接引用
  • 引入了動態字節碼生成技術(bytecode generation,即CG),明顯地提高了複雜表達式求值查詢的速率。
  • 統一API接口,如sql()、SchemaRDD生成等。
  • ......
      下面分十個小節來介紹sparkSQL1.1.0的架構和使用,但願各位讀者joy it!


第一節:爲何sparkSQL   爲本篇,介紹sparkSQL的發展歷程和性能
第二節:sparkSQL架構   介紹catalyst,而後介紹sqlContext、hiveContext的運行架構及區別
第三節:sparkSQL組件之解析   介紹sparkSQL運行架構中的各個組件的功能和實現
第四節:深刻了解sparkSQL之運行   使用hive/console更深刻了解各類計劃是如何生成的
第五節:測試環境之搭建   介紹後面章節將使用的環境搭建和測試數據
第六節:sparkSQL之基礎應用   介紹sqlContext的RDD、Json、parquet使用以及hiveContext使用
第七節:ThriftServer和CLI   介紹TriftServer和CLI的使用,以及如何使用JDBC訪問sparkSQL數據
第八節:sparkSQL之綜合應用   介紹sparkSQL和MLlib、sparkSQL和GraphX結合使用
第九節:sparkSQL之調優   介紹CG、壓縮、序化器、緩存之使用
第十節:總結
至於與hive的兼容性、具體的SQL語法之後有機會再介紹。

一:爲何sparkSQL?

1:sparkSQL的發展歷程。
A:hive and shark
      sparkSQL的前身是shark。在hadoop發展過程當中,爲了給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,hive應運而生,是當時惟一運行在hadoop上的SQL-on-Hadoop工具。可是,MapReduce計算過程當中大量的中間磁盤落地過程消耗了大量的I/O,下降的運行效率,爲了提升SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產生,其中表現較爲突出的是:
  • MapR的Drill
  • Cloudera的Impala
  • Shark
      其中Shark是伯克利實驗室spark生態環境的組件之一,它修改了下圖所示的右下角的內存管理、物理計劃、執行三個模塊,並使之能運行在spark引擎上,從而使得SQL查詢的速度獲得10-100倍的提高。


B:Shark和sparkSQL       
      可是,隨着Spark的發展,對於野心勃勃的Spark團隊來講,Shark對於hive的太多依賴(如採用hive的語法解析器、查詢優化器等等),制約了Spark的One Stack rule them all的既定方針,制約了spark各個組件的相互集成,因此提出了sparkSQL項目。SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優勢,如內存列存儲(In-Memory Columnar Storage)、Hive兼容性等,從新開發了SparkSQL代碼;因爲擺脫了對hive的依賴性,SparkSQL不管在數據兼容、性能優化、組件擴展方面都獲得了極大的方便,真可謂「退一步, 海闊天空」。
  • 數據兼容方面  不但兼容hive,還能夠從RDD、parquet文件、JSON文件中獲取數據,將來版本甚至支持獲取RDBMS數據以及cassandra等NOSQL數據
  • 性能優化方面  除了採起In-Memory Columnar Storage、byte-code generation等優化技術外、將會引進Cost Model對查詢進行動態評估、獲取最佳物理計劃等等
  • 組件擴展方面  不管是SQL的語法解析器、分析器仍是優化器均可以從新定義,進行擴展
      2014年6月1日,Shark項目和SparkSQL項目的主持人Reynold Xin宣佈:中止對Shark的開發,團隊將全部資源放sparkSQL項目上,至此,Shark的發展畫上了句話,但也所以發展出兩個直線:SparkSQL和hive on spark。

其中sparkSQL做爲Spark生態的一員繼續發展,而再也不受限於hive,只是兼容hive;而hive on spark是一個hive的發展計劃,該計劃將spark做爲hive的底層引擎之一,也就是說,hive將再也不受限於一個引擎,能夠採用map-reduce、Tez、spark等引擎。

2:sparkSQL的性能
      shark的出現,使得SQL-on-Hadoop的性能比hive有了10-100倍的提升:

       那麼,擺脫了hive的限制,sparkSQL的性能又有怎麼樣的表現呢?雖然沒有shark相對於hive那樣矚目地性能提高,但也表現得很是優異:

       爲何sparkSQL的性能會獲得怎麼大的提高呢?主要sparkSQL在下面幾點作了優化:
A:內存列存儲(In-Memory Columnar Storage)
      sparkSQL的表數據在內存中存儲不是採用原生態的JVM對象存儲方式,而是採用內存列存儲,以下圖所示。

      該存儲方式不管在空間佔用量和讀取吞吐率上都佔有很大優點。
      對於原生態的JVM對象存儲方式,每一個對象一般要增長12-16字節的額外開銷,對於一個270MB的TPC-H lineitem table數據,使用這種方式讀入內存,要使用970MB左右的內存空間(一般是2~5倍於原生數據空間);另外,使用這種方式,每一個數據記錄產生一個JVM對象,若是是大小爲200B的數據記錄,32G的堆棧將產生1.6億個對象,這麼多的對象,對於GC來講,可能要消耗幾分鐘的時間來處理(JVM的垃圾收集時間與堆棧中的對象數量呈線性相關)。顯然這種內存存儲方式對於基於內存計算的spark來講,很昂貴也負擔不起。
      對於內存列存儲來講,將全部原生數據類型的列採用原生數組來存儲,將Hive支持的複雜數據類型(如array、map等)先序化後並接成一個字節數組來存儲。這樣,每一個列建立一個JVM對象,從而致使能夠快速的GC和緊湊的數據存儲;額外的,還可使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)下降內存開銷;更有趣的是,對於分析查詢中頻繁使用的聚合特定列,性能會獲得很大的提升,緣由就是這些列的數據放在一塊兒,更容易讀入內存進行計算。

B:字節碼生成技術(bytecode generation,即CG)
      在數據庫查詢中有一個昂貴的操做是查詢語句中的表達式,主要是因爲JVM的內存模型引發的。好比以下一個查詢:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. SELECT a + b FROM table  
      在這個查詢裏,若是採用通用的SQL語法途徑去處理,會先生成一個表達式樹(有兩個節點的Add樹,參考後面章節),在物理處理這個表達式樹的時候,將會如圖所示的7個步驟:
  1. 調用虛函數Add.eval(),須要確認Add兩邊的數據類型
  2. 調用虛函數a.eval(),須要確認a的數據類型
  3. 肯定a的數據類型是Int,裝箱
  4. 調用虛函數b.eval(),須要確認b的數據類型
  5. 肯定b的數據類型是Int,裝箱
  6. 調用Int類型的Add
  7. 返回裝箱後的計算結果
其中屢次涉及到虛函數的調用,虛函數的調用會打斷CPU的正常流水線處理,減緩執行。
      Spark1.1.0在catalyst模塊的expressions增長了codegen模塊,若是使用動態字節碼生成技術(配置spark.sql.codegen參數),sparkSQL在執行物理計劃的時候,對匹配的表達式採用特定的代碼,動態編譯,而後運行。如上例子,匹配到Add方法:

而後,經過調用,最終調用:

最終實現效果相似以下僞代碼:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. val a: Int = inputRow.getInt(0)  
  2. val b: Int = inputRow.getInt(1)  
  3. val result: Int = a + b  
  4. resultRow.setInt(0, result)  
對於Spark1.1.0,對SQL表達式都做了CG優化,具體能夠參看codegen模塊。CG優化的實現主要仍是依靠scala2.10的運行時放射機制(runtime reflection)。對於SQL查詢的CG優化,能夠簡單地用下圖來表示:

 
C:scala代碼優化
      另外,sparkSQL在使用Scala編寫代碼的時候,儘可能避免低效的、容易GC的代碼;儘管增長了編寫代碼的難度,但對於用戶來講,仍是使用統一的接口,沒受到使用上的困難。下圖是一個scala代碼優化的示意圖:

 二:sparkSQL運行架構

在介紹sparkSQL以前,咱們首先來看看,傳統的關係型數據庫是怎麼運行的。當咱們提交了一個很簡單的查詢:

[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. SELECT  a1,a2,a3  FROM  tableA  Where  condition   

能夠看得出來,該語句是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)組成,分別對應sql查詢過程當中的Result、Data Source、Operation,也就是說SQL語句按Result-->Data Source-->Operation的次序來描述的。那麼,SQL語句在實際的運行過程當中是怎麼處理的呢?通常的數據庫系統先將讀入的SQL語句(Query)先進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECT、FROM、WHERE),哪些是表達式、哪些是Projection、哪些是Data Source等等。這一步就能夠判斷SQL語句是否規範,不規範就報錯,規範就繼續下一步過程綁定(Bind),這個過程將SQL語句和數據庫的數據字典(列、表、視圖等等)進行綁定,若是相關的Projection、Data Source等等都是存在的話,就表示這個SQL語句是能夠執行的;而在執行前,通常的數據庫會提供幾個執行計劃,這些計劃通常都有運行統計數據,數據庫會在這些計劃中選擇一個最優計劃(Optimize),最終執行該計劃(Execute),並返回結果。固然在實際的執行過程當中,是按Operation-->Data Source-->Result的次序來進行的,和SQL語句的次序恰好相反;在執行過程有時候甚至不須要讀取物理表就能夠返回結果,好比從新運行剛運行過的SQL語句,可能直接從數據庫的緩衝池中獲取返回結果。
      以上過程看上去很是簡單,但實際上會包含不少複雜的操做細節在裏面。而這些操做細節都和Tree有關,在數據庫解析(Parse)SQL語句的時候,會將SQL語句轉換成一個樹型結構來進行處理,以下面一個查詢,會造成一個含有多個節點(TreeNode)的Tree,而後在後續的處理過程當中對該Tree進行一系列的操做。

下圖給出了對Tree的一些可能的操做細節,對於Tree的處理過程當中所涉及更多的細節,能夠查看相關的數據庫論文。


 OK,上面簡單介紹了關係型數據庫的運行過程,那麼,sparkSQL是否是也採用相似的方式處理呢?答案是確定的。下面咱們先來看看sparkSQL中的兩個重要概念Tree和Rule、而後再介紹一下sparkSQL的兩個分支sqlContext和hiveContext、最後再綜合看看sparkSQL的優化器Catalyst。

1:Tree和Rule
      sparkSQL對SQL語句的處理和關係型數據庫對SQL語句的處理採用了相似的方法,首先會將SQL語句進行解析(Parse),而後造成一個Tree,在後續的如綁定、優化等處理過程都是對Tree的操做,而操做的方法是採用Rule,經過模式匹配,對不一樣類型的節點採用不一樣的操做。
A:Tree
  • Tree的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
  • Logical Plans、Expressions、Physical Operators均可以使用Tree表示
  • Tree的具體操做是經過TreeNode來實現的
    • sparkSQL定義了catalyst.trees的日誌,經過這個日誌能夠形象的表示出樹的結構
    • TreeNode可使用scala的集合操做方法(如foreach, map, flatMap, collect等)進行操做
    • 有了TreeNode,經過Tree中各個TreeNode之間的關係,能夠對Tree進行遍歷操做,如使用transformDown、transformUp將Rule應用到給定的樹段,而後用結果替代舊的樹段;也可使用transformChildrenDown、transformChildrenUp對一個給定的節點進行操做,經過迭代將Rule應用到該節點以及子節點。

  • TreeNode能夠細分紅三種類型的Node:
    • UnaryNode 一元節點,即只有一個子節點。如Limit、Filter操做
    • BinaryNode 二元節點,即有左右子節點的二叉節點。如Jion、Union操做
    • LeafNode 葉子節點,沒有子節點的節點。主要用戶命令類操做,如SetCommand


B:Rule
  • Rule的相關代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
  • Rule在sparkSQL的Analyzer、Optimizer、SparkPlan等各個組件中都有應用到
  • Rule是一個抽象類,具體的Rule實現是經過RuleExecutor完成
  • Rule經過定義batch和batchs,能夠簡便的、模塊化地對Tree進行transform操做
  • Rule經過定義Once和FixedPoint,能夠對Tree進行一次操做或屢次操做(如對某些Tree進行屢次迭代操做的時候,達到FixedPoint次數迭代或達到先後兩次的樹結構沒變化才中止操做,具體參看RuleExecutor.apply)

      拿個簡單的例子,在處理由解析器(SqlParse)生成的LogicPlan Tree的時候,在Analyzer中就定義了多種Rules應用到LogicPlan Tree上。
      應用示意圖:

      Analyzer中使用的Rules,定義了batches,由多個batch構成,如MultiInstanceRelations、Resolution、Check Analysis、AnalysisOperators等構成;每一個batch又有不一樣的rule構成,如Resolution由ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances等構成;每一個rule又有本身相對應的處理函數,能夠具體參看Analyzer中的ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances函數;同時要注意的是,不一樣的rule應用次數是不一樣的:如CaseInsensitiveAttributeReferences這個batch中rule只應用了一次(Once),而Resolution這個batch中的rule應用了屢次(fixedPoint = FixedPoint(100),也就是說最多應用100次,除非先後迭代結果一致退出)。

在整個sql語句的處理過程當中,Tree和Rule相互配合,完成了解析、綁定(在sparkSQL中稱爲Analysis)、優化、物理計劃等過程,最終生成能夠執行的物理計劃。
知道了sparkSQL的各個過程的基本處理方式,下面來看看sparkSQL的運行過程。sparkSQL有兩個分支,sqlContext和hivecontext,sqlContext如今只支持sql語法解析器(SQL-92語法);hiveContext如今支持sql語法解析器和hivesql語法解析器,默認爲hivesql語法解析器,用戶能夠經過配置切換成sql語法解析器,來運行hiveql不支持的語法,如select 1。關於sqlContext和hiveContext的具體應用請參看第六部分。

2:sqlContext的運行過程
      sqlContext是使用sqlContext.sql(sqlText)來提交用戶sql語句:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */  
  2.   def sql(sqlText: String): SchemaRDD = {  
  3.     if (dialect == "sql") {  
  4.       new SchemaRDD(this, parseSql(sqlText))   //parseSql(sqlText)對sql語句進行語法解析  
  5.     } else {  
  6.       sys.error(s"Unsupported SQL dialect: $dialect")  
  7.     }  
  8.   }  
sqlContext.sql的返回結果是SchemaRDD,調用了new SchemaRDD(this, parseSql(sqlText)) 來對sql語句進行處理,處理以前先使用catalyst.SqlParser對sql語句進行語法解析,使之生成Unresolved LogicalPlan。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */  
  2.   protected[sql] val parser = new catalyst.SqlParser  
  3.   protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)  
SchemaRDD繼承自 SchemaRDDLike

[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala  */  
  2. class SchemaRDD(  
  3.     @transient val sqlContext: SQLContext,  
  4.     @transient val baseLogicalPlan: LogicalPlan)  
  5.   extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike  
SchemaRDDLike中調用sqlContext.executePlan(baseLogicalPlan)來執行catalyst.SqlParser解析後生成Unresolved LogicalPlan,這裏的baseLogicalPlan就是指Unresolved LogicalPlan。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala  */  
  2. private[sql] trait SchemaRDDLike {  
  3.   @transient val sqlContext: SQLContext  
  4.   @transient val baseLogicalPlan: LogicalPlan  
  5.   private[sql] def baseSchemaRDD: SchemaRDD  
  6.   
  7.   lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)  
sqlContext.executePlan作了什麼呢?它調用了 QueryExecution類
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */  
  2. protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =  
  3.     new this.QueryExecution { val logical = plan }  
QueryExecution類的定義:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */  
  2. protected abstract class QueryExecution {  
  3.     def logical: LogicalPlan  
  4.      
  5.     //對Unresolved LogicalPlan進行analyzer,生成resolved LogicalPlan  
  6.     lazy val analyzed = ExtractPythonUdfs(analyzer(logical))      
  7.     //對resolved LogicalPlan進行optimizer,生成optimized LogicalPlan  
  8.     lazy val optimizedPlan = optimizer(analyzed)    
  9.     // 將optimized LogicalPlan轉換成PhysicalPlan  
  10.     lazy val sparkPlan = {  
  11.       SparkPlan.currentContext.set(self)  
  12.       planner(optimizedPlan).next()  
  13.     }  
  14.     // PhysicalPlan執行前的準備工做,生成可執行的物理計劃  
  15.     lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)  
  16.   
  17.     //執行可執行物理計劃  
  18.     lazy val toRdd: RDD[Row] = executedPlan.execute()  
  19.   
  20.     ......  
  21.   }  
sqlContext總的一個過程以下圖所示:
  1. SQL語句通過SqlParse解析成UnresolvedLogicalPlan;
  2. 使用analyzer結合數據數據字典(catalog)進行綁定,生成resolvedLogicalPlan;
  3. 使用optimizer對resolvedLogicalPlan進行優化,生成optimizedLogicalPlan;
  4. 使用SparkPlan將LogicalPlan轉換成PhysicalPlan;
  5. 使用prepareForExecution()將PhysicalPlan轉換成可執行物理計劃;
  6. 使用execute()執行可執行物理計劃;
  7. 生成SchemaRDD。
在整個運行過程當中涉及到多個sparkSQL的組件,如SqlParse、 analyzer、 optimizer、 SparkPlan等等,其功能和 實如今下一章節中詳解。



3:hiveContext的運行過程
      在分佈式系統中,因爲歷史緣由,不少數據已經定義了hive的元數據,經過這些hive元數據,sparkSQL使用hiveContext很容易實現對這些數據的訪問。值得注意的是hiveContext繼承自sqlContext,因此在hiveContext的的運行過程當中除了override的函數和變量,可使用和sqlContext同樣的函數和變量。
      從sparkSQL1.1開始,hiveContext使用hiveContext.sql(sqlText)來提交用戶sql語句進行查詢:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */  
  2. override def sql(sqlText: String): SchemaRDD = {  
  3.     // 使用spark.sql.dialect定義採用的語法解析器  
  4.     if (dialect == "sql") {  
  5.       super.sql(sqlText)    //若是使用sql解析器,則使用sqlContext的sql方法  
  6.     } else if (dialect == "hiveql") {     //若是使用和hiveql解析器,則使用HiveQl.parseSql  
  7.       new SchemaRDD(this, HiveQl.parseSql(sqlText))  
  8.     }  else {  
  9.       sys.error(s"Unsupported SQL dialect: $dialect.  Try 'sql' or 'hiveql'")  
  10.     }  
  11.   }  
hiveContext.sql首先根據用戶的語法設置(spark.sql.dialect)決定具體的執行過程,若是dialect == "sql"則採用sqlContext的sql語法執行過程;若是是dialect == "hiveql",則採用hiveql語法執行過程。在這裏咱們主要看看hiveql語法執行過程。能夠看出,hiveContext.sql調用了new SchemaRDD(this, HiveQl.parseSql(sqlText))對hiveql語句進行處理,處理以前先使用對語句進行語法解析。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala  */  
  2.   /** Returns a LogicalPlan for a given HiveQL string. */  
  3.   def parseSql(sql: String): LogicalPlan = {  
  4.     try {  
  5.       if (條件)   {  
  6.       //非hive命令的處理,如set、cache table、add jar等直接轉化成command類型的LogicalPlan  
  7.       .....    
  8.       } else {  
  9.         val tree = getAst(sql)  
  10.         if (nativeCommands contains tree.getText) {  
  11.           NativeCommand(sql)  
  12.         } else {  
  13.           nodeToPlan(tree) match {  
  14.             case NativePlaceholder => NativeCommand(sql)  
  15.             case other => other  
  16.           }  
  17.         }  
  18.       }  
  19.     } catch {  
  20.       //異常處理  
  21.       ......  
  22.     }  
  23.   }  
由於sparkSQL所支持的hiveql除了兼容hive語句外,還兼容一些sparkSQL自己的語句,因此在HiveQl.parseSql對hiveql語句語法解析的時候:
  • 首先考慮一些非hive語句的處理,這些命令屬於sparkSQL自己的命令語句,如設置sparkSQL運行參數的set命令、cache table、add jar等,將這些語句轉換成command類型的LogicalPlan;
  • 若是是hive語句,則調用getAst(sql)使用hive的ParseUtils將該語句先解析成AST樹,而後根據AST樹中的關鍵字進行轉換:相似命令型的語句、DDL類型的語句轉換成command類型的LogicalPlan;其餘的轉換經過nodeToPlan轉換成LogicalPlan。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala  */    
  2. /** * Returns the AST for the given SQL string.    */  
  3.   def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))  
和sqlContext同樣,類SchemaRDD繼承自SchemaRDDLike,SchemaRDDLike調用sqlContext.executePlan(baseLogicalPlan),不過hiveContext重寫了executePlan()函數:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */   
  2. override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =  
  3.     new this.QueryExecution { val logical = plan }  
並使用了一個繼承自sqlContext. QueryExecution的 新的 QueryExecution類
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */  
  2.  protected[sql] abstract class QueryExecution extends super.QueryExecution {  
  3.     // TODO: Create mixin for the analyzer instead of overriding things here.  
  4.     override lazy val optimizedPlan =  
  5.       optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))  
  6.   
  7.     override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())  
  8.     ......  
  9.   }  
因此在hiveContext的運行過程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。
hiveContext的catalog,是指向 Hive Metastore:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */  
  2. /* A catalyst metadata catalog that points to the Hive Metastore. */  
  3.   @transient  
  4.   override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {  
  5.     override def lookupRelation(  
  6.       databaseName: Option[String],  
  7.       tableName: String,  
  8.       alias: Option[String] = None): LogicalPlan = {  
  9.   
  10.       LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))  
  11.     }  
  12.   }  
hiveContext的analyzer,使用了新的catalog和functionRegistry:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */  
  2.   /* An analyzer that uses the Hive metastore. */  
  3.   @transient  
  4.   override protected[sql] lazy val analyzer =  
  5.     new Analyzer(catalog, functionRegistry, caseSensitive = false)  
hiveContext的planner,使用新定義的hivePlanner:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */  
  2.   @transient  
  3.   override protected[sql] val planner = hivePlanner  
因此hiveContext總的一個過程以下圖所示:
  1. SQL語句通過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個解析過程當中對hiveql語句使用getAst()獲取AST樹,而後再進行解析;
  2. 使用analyzer結合數據hive源數據Metastore(新的catalog)進行綁定,生成resolved LogicalPlan;
  3. 使用optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan,優化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進行預處理;
  4. 使用hivePlanner將LogicalPlan轉換成PhysicalPlan;
  5. 使用prepareForExecution()將PhysicalPlan轉換成可執行物理計劃;
  6. 使用execute()執行可執行物理計劃;
  7. 執行後,使用map(_.copy)將結果導入SchemaRDD。
hiveContxt還有不少針對hive的特性,更細節的內容參看源碼。

 
4:catalyst優化器
      sparkSQL1.1整體上由四個模塊組成:core、catalyst、hive、hive-Thriftserver:
  • core處理數據的輸入輸出,從不一樣的數據源獲取數據(RDD、Parquet、json等),將查詢結果輸出成schemaRDD;
  • catalyst處理查詢語句的整個處理過程,包括解析、綁定、優化、物理計劃等,說其是優化器,還不如說是查詢引擎;
  • hive對hive數據的處理
  • hive-ThriftServer提供CLI和JDBC/ODBC接口
      在這四個模塊中,catalyst處於最核心的部分,其性能優劣將影響總體的性能。因爲發展時間尚短,還有不少不足的地方,但其插件式的設計,爲將來的發展留下了很大的空間。下面是catalyst的一個設計圖:

 其中虛線部分是之後版本要實現的功能,實線部分是已經實現的功能。從上圖看,catalyst主要的實現組件有:
  • sqlParse,完成sql語句的語法解析功能,目前只提供了一個簡單的sql解析器;
  • Analyzer,主要完成綁定工做,將不一樣來源的Unresolved LogicalPlan和數據元數據(如hive metastore、Schema catalog)進行綁定,生成resolved LogicalPlan;
  • optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan;
  • Planner將LogicalPlan轉換成PhysicalPlan;
  • CostModel,主要根據過去的性能統計數據,選擇最佳的物理執行計劃
這些組件的基本實現方法:
  • 先將sql語句經過解析生成Tree,而後在不一樣階段使用不一樣的Rule應用到Tree上,經過轉換完成各個組件的功能。
  • Analyzer使用Analysis Rules,配合數據元數據(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉換成resolved LogicalPlan;
  • optimizer使用Optimization Rules,對resolved LogicalPlan進行合併、列裁剪、過濾器下推等優化做業而轉換成optimized LogicalPlan;
  • Planner使用Planning Strategies,對optimized LogicalPlan
關於本篇中涉及到的相關概念和組件在下篇再詳細介紹。

三:sparkSQL組件之解析

上篇在整體上介紹了sparkSQL的運行架構及其基本實現方法(Tree和Rule的配合),也大體介紹了sparkSQL中涉及到的各個概念和組件。本篇將詳細地介紹一下關鍵的一些概念和組件,因爲hiveContext繼承自sqlContext,關鍵的概念和組件相似,只不事後者針對hive的特性作了一些修正和重寫,因此本篇就只介紹sqlContext的關鍵的概念和組件。
  • 概念:
    • LogicalPlan
  • 組件:
    • SqlParser
    • Analyzer
    • Optimizer
    • Planner

1:LogicalPlan
在sparkSQL的運行架構中,LogicalPlan貫穿了大部分的過程,其中catalyst中的SqlParser、Analyzer、Optimizer都要對LogicalPlan進行操做。LogicalPlan的定義以下:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
  self: Product =>
  case class Statistics(
    sizeInBytes: BigInt
  )
  lazy val statistics: Statistics = {
    if (children.size == 0) {
      throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
    }

    Statistics(
      sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
  }

  /**
   * Returns the set of attributes that this node takes as
   * input from its children.
   */
  lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))

  /**
   * Returns true if this expression and all its children have been resolved to a specific schema
   * and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan
   * can override this (e.g.
   * [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
   * should return `false`).
   */
  lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved

  /**
   * Returns true if all its children of this query plan have been resolved.
   */
  def childrenResolved: Boolean = !children.exists(!_.resolved)

  /**
   * Optionally resolves the given string to a [[NamedExpression]] using the input from all child
   * nodes of this LogicalPlan. The attribute is expressed as
   * as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
   */
  def resolveChildren(name: String): Option[NamedExpression] =
    resolve(name, children.flatMap(_.output))

  /**
   * Optionally resolves the given string to a [[NamedExpression]] based on the output of this
   * LogicalPlan. The attribute is expressed as string in the following form:
   * `[scope].AttributeName.[nested].[fields]...`.
   */
  def resolve(name: String): Option[NamedExpression] =
    resolve(name, output)

  /** Performs attribute resolution given a name and a sequence of possible attributes. */
  protected def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = {
    val parts = name.split("\\.")
    val options = input.flatMap { option =>
      val remainingParts =
        if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts
      if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
    }

    options.distinct match {
      case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it.
      // One match, but we also need to extract the requested nested field.
      case Seq((a, nestedFields)) =>
        a.dataType match {
          case StructType(fields) =>
            Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
          case _ => None // Don't know how to resolve these field references
        }
      case Seq() => None         // No matches.
      case ambiguousReferences =>
        throw new TreeNodeException(
          this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
    }
  }
}</p>
在LogicalPlan裏維護者一套統計數據和屬性數據,也提供瞭解析方法。同時延伸了三種類型的LogicalPlan:
  • LeafNode:對應於trees.LeafNode的LogicalPlan
  • UnaryNode:對應於trees.UnaryNode的LogicalPlan
  • BinaryNode:對應於trees.BinaryNode的LogicalPlan
而對於SQL語句解析時,會調用和SQL匹配的操做方法來進行解析;這些操做分四大類,最終生成LeafNode、UnaryNode、BinaryNode中的一種:
  • basicOperators:一些數據基本操做,如Ioin、Union、Filter、Project、Sort
  • commands:一些命令操做,如SetCommand、CacheCommand
  • partitioning:一些分區操做,如RedistributeData
  • ScriptTransformation:對腳本的處理,如ScriptTransformation
  • LogicalPlan類的整體架構以下所示

 
2:SqlParser
SqlParser的功能就是將SQL語句解析成Unresolved LogicalPlan。現階段的SqlParser語法解析功能比較簡單,支持的語法比較有限。其解析過程當中有兩個關鍵組件和一個關鍵函數:
  • 詞法讀入器SqlLexical,其做用就是將輸入的SQL語句進行掃描、去空、去註釋、校驗、分詞等動做。
  • SQL語法表達式query,其做用定義SQL語法表達式,同時也定義了SQL語法表達式的具體實現,即將不一樣的表達式生成不一樣sparkSQL的Unresolved LogicalPlan。
  • 函數phrase(),上面個兩個組件經過調用phrase(query)(new lexical.Scanner(input)),完成對SQL語句的解析;在解析過程當中,SqlLexical一邊讀入,一邊解析,若是碰上生成符合SQL語法的表達式時,就調用相應SQL語法表達式的具體實現函數,將SQL語句解析成Unresolved LogicalPlan。
下面看看sparkSQL的整個解析過程和相關組件:
A:解析過程
首先,在sqlContext中使用下面代碼調用catalyst.SqlParser:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;"><span style="white-space: normal;">/*源自 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */</span></span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;"><span style="white-space: normal;">  protected[sql] val parser = new catalyst.SqlParser</span></span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;"><span style="white-space: normal;">  protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)</span></span></p>
而後,直接在SqlParser的apply方法中對輸入的SQL語句進行解析,解析功能的核心代碼就是:
phrase(query)(new lexical.Scanner(input))
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">class SqlParser extends StandardTokenParsers with PackratParsers {
  def apply(input: String): LogicalPlan = {
    if (input.trim.toLowerCase.startsWith("set")) {
      //set設置項的處理</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">    ......
    } else {
      <span style="color: rgb(255, 0, 0);">phrase(query)(new lexical.Scanner(input))</span> match {
        case Success(r, x) => r
        case x => sys.error(x.toString)
      }
    }
  }</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">......</p>
能夠看得出來,該語句就是調用 phrase()函數,使用SQL語法表達式query,對詞法讀入器lexical讀入的SQL語句進行解析,其中詞法讀入器lexical經過重寫語句:override val lexical = new SqlLexical(reservedWords) 調用擴展了功能的SqlLexical。其定義:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */</span>  </p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">// Use reflection to find the reserved words defined in this class.
  protected val reservedWords =
    this.getClass
      .getMethods
      .filter(_.getReturnType == classOf[Keyword])
      .map(_.invoke(this).asInstanceOf[Keyword].str)

  override val lexical = new SqlLexical(reservedWords)</p>
爲了加深對SQL語句解析過程的理解,讓咱們看看下面這個簡單數字表達式解析過程來講明:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">import scala.util.parsing.combinator.PackratParsers
import scala.util.parsing.combinator.syntactical._

object mylexical extends StandardTokenParsers with PackratParsers {
  //定義分割符
  lexical.delimiters ++= List(".", ";", "+", "-", "*")
  //定義表達式,支持加,減,乘
  lazy val expr: PackratParser[Int] = plus | minus | multi
  //加法表示式的實現
  lazy val plus: PackratParser[Int] = num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt}
  //減法表達式的實現
  lazy val minus: PackratParser[Int] = num ~ "-" ~ num ^^ { case n1 ~ "-" ~ n2 => n1.toInt - n2.toInt}
  //乘法表達式的實現
  lazy val multi: PackratParser[Int] = num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt}
  lazy val num = numericLit

  def parse(input: String) = {</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">    //定義詞法讀入器myread,並將掃描頭放置在input的首位</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">    val myread = new PackratReader(new lexical.Scanner(input)) 
    print("處理表達式 " + input)
    phrase(expr)(myread) match {
      case Success(result, _) => println(" Success!"); println(result); Some(result)
      case n => println(n); println("Err!"); None
    }
  }

  def main(args: Array[String]) {
    val prg = "6 * 3" :: "24-/*aaa*/4" :: "a+5" :: "21/3" :: Nil
    prg.map(parse)
  }
}</p>
運行結果:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">處理表達式 6 * 3 Success!     //lexical對空格進行了處理,獲得6*3
18     //6*3符合乘法表達式,調用n1.toInt * n2.toInt,獲得結果並返回</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">處理表達式 24-/*aaa*/4 Success!  //lexical對註釋進行了處理,獲得20-4
20    //20-4符合減法表達式,調用n1.toInt - n2.toInt,獲得結果並返回
處理表達式 a+5[1.1] failure: number expected 
      //lexical在解析到a,發現不是整數型,故報錯誤位置和內容
a+5
^
Err!
處理表達式 21/3[1.3] failure: ``*'' expected but ErrorToken(illegal character) found
      //lexical在解析到/,發現不是分割符,故報錯誤位置和內容
21/3
  ^
Err!</p>
      在運行的時候,首先對錶達式 6 * 3 進行解析,詞法讀入器myread將掃描頭置於6的位置;當phrase()函數使用定義好的數字表達式expr處理6 * 3的時候,6 * 3每讀入一個詞法,就和expr進行匹配,如讀入6*和expr進行匹配,先匹配表達式plus,*和+匹配不上;就繼續匹配表達式minus,*和-匹配不上;就繼續匹配表達式multi,此次匹配上了,等讀入3的時候,由於3是num類型,就調用調用n1.toInt * n2.toInt進行計算。
      注意,這裏的expr、plus、minus、multi、num都是表達式,|、~、^^是複合因子,表達式和複合因子能夠組成一個新的表達式,如plus(num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt})就是一個由num、+、num、函數構成的複合表達式;而expr(plus | minus | multi)是由plus、minus、multi構成的複合表達式;複合因子的含義定義在類scala/util/parsing/combinator/Parsers.scala,下面是幾個經常使用的複合因子:
  • p ~ q p成功,纔會q;放回p,q的結果
  • p ~> q p成功,纔會q,返回q的結果
  • p <~ q p成功,纔會q,返回p的結果
  • p | q p失敗則q,返回第一個成功的結果
  • p ^^ f 若是p成功,將函數f應用到p的結果上
  • p ^? f 若是p成功,若是函數f能夠應用到p的結果上的話,就將p的結果用f進行轉換
      針對上面的6 * 3使用的是multi表達式(num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt}),其含義就是:num後跟*再跟num,若是知足就將使用函數n1.toInt * n2.toInt。
      到這裏爲止,你們應該明白整個解析過程了吧, sparkSQL1.1入門之三:sparkSQL組件之解析 - mmicky - mmicky 的博客。SqlParser的原理和這個表達式解析器使用了同樣的原理,只不過是定義的SQL語法表達式query複雜一些,使用的詞法讀入器更豐富一些而已。下面分別介紹一下相關組件SqlParser、SqlLexical、query。

B:SqlParser
首先,看看SqlParser的UML圖:

其次,看看SqlParser的定義,SqlParser繼承自類StandardTokenParsers和特質PackratParsers:
其中,PackratParsers:
  • 擴展了scala.util.parsing.combinator.Parsers所提供的parser,作了內存化處理;
  • Packrat解析器實現了回溯解析和遞歸降低解析,具備無限先行和線性分析時的優點。同時,也支持左遞歸詞法解析。
  • 從Parsers中繼承出來的class或trait均可以使用PackratParsers,如:object MyGrammar extends StandardTokenParsers with PackratParsers;
  • PackratParsers將分析結果進行緩存,所以,PackratsParsers須要PackratReader(內存化處理的Reader)做爲輸入,程序員能夠手工建立PackratReader,如production(new PackratReader(new lexical.Scanner(input))),更多的細節參見scala庫中/scala/util/parsing/combinator/PackratParsers.scala文件。
StandardTokenParsers是最終繼承自Parsers
  • 增長了詞法的處理能力(Parsers是字符處理),在StdTokenParsers中定義了四種基本詞法:
    • keyword tokens
    • numeric literal tokens
    • string literal tokens
    • identifier tokens
  • 定義了一個詞法讀入器lexical,能夠進行詞法讀入
SqlParser在進行解析SQL語句的時候是調用了PackratParsers中phrase():
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 scala/util/parsing/combinator/PackratParsers.scala */</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"> /**
   *  A parser generator delimiting whole phrases (i.e. programs).
   *
   *  Overridden to make sure any input passed to the argument parser
   *  is wrapped in a `PackratReader`.
   */
  override def phrase[T](p: Parser[T]) = {
    val q = super.phrase(p)
    new PackratParser[T] {
      def apply(in: Input) = in match {
        case in: PackratReader[_] => q(in)
        case in => q(new PackratReader(in))
      }
    }
  }</p>
在解析過程當中,通常會定義多個表達式,如上面例子中的plus | minus | multi,一旦前一個表達式不能解析的話,就會調用下一個表達式進行解析:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">/</span>*源自 scala/util/parsing/combinator/Parsers.scala */</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">    def append[U >: T](p0: => Parser[U]): Parser[U] = { lazy val p = p0 // lazy argument
      Parser{ in => this(in) append p(in)}
    }</p>
表達式解析正確後,具體的實現函數是在 PackratParsers中完成:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">/*源自 scala/util/parsing/combinator/PackratParsers.scala */</span>  </p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"> def memo[T](p: super.Parser[T]): PackratParser[T] = {
    new PackratParser[T] {
      def apply(in: Input) = {
        val inMem = in.asInstanceOf[PackratReader[Elem]]

        //look in the global cache if in a recursion
        val m = recall(p, inMem)
        m match {
          //nothing has been done due to recall
          case None =>
            val base = LR(Failure("Base Failure",in), p, None)
            inMem.lrStack = base::inMem.lrStack
            //cache base result
            inMem.updateCacheAndGet(p,MemoEntry(Left(base)))
            //parse the input
            val tempRes = p(in)
            //the base variable has passed equality tests with the cache
            inMem.lrStack = inMem.lrStack.tail
            //check whether base has changed, if yes, we will have a head
            base.head match {
              case None =>
                /*simple result*/
                inMem.updateCacheAndGet(p,MemoEntry(Right(tempRes)))
                tempRes
              case s@Some(_) =>
                /*non simple result*/
                base.seed = tempRes
                //the base variable has passed equality tests with the cache
                val res = lrAnswer(p, inMem, base)
                res
            }

          case Some(mEntry) => {
            //entry found in cache
            mEntry match {
              case MemoEntry(Left(recDetect)) => {
                setupLR(p, inMem, recDetect)
                //all setupLR does is change the heads of the recursions, so the seed will stay the same
                recDetect match {case LR(seed, _, _) => seed.asInstanceOf[ParseResult[T]]}
              }
              case MemoEntry(Right(res: ParseResult[_])) => res.asInstanceOf[ParseResult[T]]
            }
          }
        }
      }
    }
  }</p>
StandardTokenParsers增長了詞法處理能力,SqlParers定義了大量的關鍵字,重寫了詞法讀入器,將這些關鍵字應用於詞法讀入器。

C:SqlLexical
詞法讀入器SqlLexical擴展了StdLexical的功能,首先增長了大量的關鍵字:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */</span><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">  </span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">  protected val ALL = Keyword("ALL")
  protected val AND = Keyword("AND")
  protected val AS = Keyword("AS")
  protected val ASC = Keyword("ASC")
  ......
  protected val SUBSTR = Keyword("SUBSTR")
  protected val SUBSTRING = Keyword("SUBSTRING")</p>
其次豐富了分隔符、詞法處理、空格註釋處理:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */</span><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;"> </span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">delimiters += (
      "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
      ",", ";", "%", "{", "}", ":", "[", "]"
  )

  override lazy val token: Parser[Token] = (
    identChar ~ rep( identChar | digit ) ^^
      { case first ~ rest => processIdent(first :: rest mkString "") }
      | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
      case i ~ None    => NumericLit(i mkString "")
      case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
    }
      | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
      { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
      | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
      { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
      | EofCh ^^^ EOF
      | '\'' ~> failure("unclosed string literal")
      | '\"' ~> failure("unclosed string literal")
      | delim
      | failure("illegal character")
    )

  override def identChar = letter | elem('_') | elem('.')

  override def whitespace: Parser[Any] = rep(
    whitespaceChar
      | '/' ~ '*' ~ comment
      | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
      | '#' ~ rep( chrExcept(EofCh, '\n') )
      | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
      | '/' ~ '*' ~ failure("unclosed comment")
  )</p>
最後看看SQL語法表達式query。

D:query
SQL語法表達式支持3種操做:select、insert、cache
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */</span><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;"> </span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">protected lazy val query: Parser[LogicalPlan] = (
    select * (
        UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
        INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
        EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
        UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
      )
    | insert | cache
  )</p>
而這些操做還有具體的定義,如select,這裏開始定義了具體的函數,將SQL語句轉換成構成 Unresolved LogicalPlan的一些Node
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;"><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */</span><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;"> </span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">  protected lazy val select: Parser[LogicalPlan] =
    SELECT ~> opt(DISTINCT) ~ projections ~
    opt(from) ~ opt(filter) ~
    opt(grouping) ~
    opt(having) ~
    opt(orderBy) ~
    opt(limit) <~ opt(";") ^^ {
      case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
        val base = r.getOrElse(NoRelation)
        val withFilter = f.map(f => Filter(f, base)).getOrElse(base)
        val withProjection =
          g.map {g =>
            Aggregate(assignAliases(g), assignAliases(p), withFilter)
          }.getOrElse(Project(assignAliases(p), withFilter))
        val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
        val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
        val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
        val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
        withLimit
  }
</p><div>
</div>

3:Analyzer
Analyzer的功能就是對來自SqlParser的Unresolved LogicalPlan中的UnresolvedAttribute項和UnresolvedRelation項,對照catalog和FunctionRegistry生成Analyzed LogicalPlan 。Analyzer定義了5大類14小類的rule:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 <span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">sql/catalyst/src/main/scala/org/apache/spark/</span><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">sql/catalyst/analysis/Analyzer.scala */</span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">  val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      ResolveSortReferences ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      UnresolvedHavingClauseAttributes :: 
      typeCoercionRules :_*),
    Batch("Check Analysis", Once,
      CheckResolution),
    Batch("AnalysisOperators", fixedPoint,
      EliminateAnalysisOperators)
  )</p>
  • MultiInstanceRelations
    • NewRelationInstances
  • CaseInsensitiveAttributeReferences
    • LowercaseAttributeReferences
  • Resolution
    • ResolveReferences
    • ResolveRelations
    • ResolveSortReferences 
    • NewRelationInstances 
    • ImplicitGenerate
    • StarExpansion
    • ResolveFunctions
    • GlobalAggregates
    • UnresolvedHavingClauseAttributes
    • typeCoercionRules
  • Check Analysis
    • CheckResolution
  • AnalysisOperators
    • EliminateAnalysisOperators
這些rule 都是使用transform對 Unresolved LogicalPlan進行操做,其中 typeCoercionRules是對HiveQL語義進行處理,在其下面又定義了多個rule:PropagateTypes、ConvertNaNs、WidenTypes、PromoteStrings、BooleanComparisons、BooleanCasts、StringToIntegralCasts、FunctionArgumentConversion、CaseWhenCoercion、Division,一樣了這些rule也是使用 transform對 Unresolved LogicalPlan進行操做。這些rule操做後,使得LogicalPlan的信息變得豐滿和易懂。下面拿其中的兩個rule來簡單介紹一下:
好比rule之 ResolveReferences,最終調用LogicalPlan的resolveChildren對列名給一名字和序號,如name#67之列的,這樣保持列的惟一性:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 <span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">sql/catalyst/src/main/scala/org/apache/spark/</span><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">sql/catalyst/analysis/Analyzer.scala */</span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">  object ResolveReferences extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
      case q: LogicalPlan if q.childrenResolved =>
        logTrace(s"Attempting to resolve ${q.simpleString}")
        q transformExpressions {
          case u @ UnresolvedAttribute(name) =>
            // Leave unchanged if resolution fails.  Hopefully will be resolved next round.
            val result = q.resolveChildren(name).getOrElse(u)
            logDebug(s"Resolving $u to $result")
            result
        }
    }
  }</p>
      又好比rule之StarExpansion,其做用就是將Select * Fom tbl中的*展開,賦予列名:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 <span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">sql/catalyst/src/main/scala/org/apache/spark/</span><span style="font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋體;">sql/catalyst/analysis/Analyzer.scala */</span></p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">  object StarExpansion extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      // Wait until children are resolved
      case p: LogicalPlan if !p.childrenResolved => p
      // If the projection list contains Stars, expand it.
      case p @ Project(projectList, child) if containsStar(projectList) =>
        Project(
          projectList.flatMap {
            case s: Star => s.expand(child.output)
            case o => o :: Nil
          },
          child)
      case t: ScriptTransformation if containsStar(t.input) =>
        t.copy(
          input = t.input.flatMap {
            case s: Star => s.expand(t.child.output)
            case o => o :: Nil
          }
        )
      // If the aggregate function argument contains Stars, expand it.
      case a: Aggregate if containsStar(a.aggregateExpressions) =>
        a.copy(
          aggregateExpressions = a.aggregateExpressions.flatMap {
            case s: Star => s.expand(a.child.output)
            case o => o :: Nil
          }
        )
    }

    /**
     * Returns true if `exprs` contains a [[Star]].
     */
    protected def containsStar(exprs: Seq[Expression]): Boolean =
      exprs.collect { case _: Star => true }.nonEmpty
  }
}</p>

4:Optimizer
Optimizer的功能就是未來自Analyzer的Analyzed LogicalPlan進行多種rule優化,生成Optimized LogicalPlan。Optimizer定義了3大類12個小類的優化rule:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Combine Limits", FixedPoint(100),
      CombineLimits) ::
    Batch("ConstantFolding", FixedPoint(100),
      NullPropagation,
      ConstantFolding,
      LikeSimplification,
      BooleanSimplification,
      SimplifyFilters,
      SimplifyCasts,
      SimplifyCaseConversionExpressions) ::
    Batch("Filter Pushdown", FixedPoint(100),
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughJoin,
      ColumnPruning) :: Nil
}</p>
  • Combine Limits 合併Limit
    • CombineLimits:將兩個相鄰的limit合爲一個
  • ConstantFolding 常量疊加
    • NullPropagation 空格處理 
    • ConstantFolding:常量疊加
    • LikeSimplification:like表達式簡化
    • BooleanSimplification:布爾表達式簡化
    • SimplifyFilters:Filter簡化
    • SimplifyCasts:Cast簡化
    • SimplifyCaseConversionExpressions:CASE大小寫轉化表達式簡化 
  • Filter Pushdown Filter下推
    • CombineFilters Filter合併 
    • PushPredicateThroughProject 經過Project謂詞下推 
    • PushPredicateThroughJoin 經過Join謂詞下推 
    • ColumnPruning 列剪枝 
這些優化rule都是使用transform對LogicalPlan進行操做,如合併、刪除冗餘、簡化、剪枝等,是整個LogicalPlan變得更簡潔更高效。
好比將兩個相鄰的limit進行合併,可使用 CombineLimits。 象sql("select * from (select * from src limit 5)a limit 3 ") 這樣一個SQL語句,會將limit 5和limit 3進行合併,只剩一個一個limit 3。
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">object CombineLimits extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case ll @ Limit(le, nl @ Limit(ne, grandChild)) =>
      Limit(If(LessThan(ne, le), ne, le), grandChild)
  }
}</p>
      又好比Null值的處理,可使用NullPropagation處理。象sql("select count(null) from src where key is not null")這樣一個SQL語句會轉換成sql("select count(0) from src where key is not null")來處理。
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */</p><p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">object NullPropagation extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType)
      case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)
      case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)
      case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)
      case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)
      case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)
      case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)
      case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)
      case e @ EqualNullSafe(Literal(null, _), r) => IsNull(r)
      case e @ EqualNullSafe(l, Literal(null, _)) => IsNull(l)

      ......
    }
  }
}</p>
      對於具體的優化方法可使用下一章所介紹的hive/console調試方法進行調試,用戶可使用自定義的優化函數,也可使用sparkSQL提供的優化函數。使用前先定義一個要優化查詢,而後查看一下該查詢的Analyzed LogicalPlan,再使用優化函數去優化,將生成的Optimized LogicalPlan和Analyzed LogicalPlan進行比較,就能夠看到優化的效果。

四:深刻了解sparkSQL運行計劃

前面兩章花了很多篇幅介紹了SparkSQL的運行過程,不少讀者仍是以爲其中的概念很抽象,好比Unresolved LogicPlan、LogicPlan、PhysicalPlan是長得什麼樣子,沒點印象,只知道名詞,感受很縹緲。本章就着重介紹一個工具hive/console,來加深讀者對sparkSQL的運行計劃的理解。

1:hive/console安裝
      sparkSQL從1.0.0開始提供了一個sparkSQL的調試工具hive/console。該工具是給開發者使用,在編譯生成的安裝部署包中並無;該工具須要使用sbt編譯運行。要使用該工具,須要具有如下條件:
  • spark1.1.0源碼
  • hive0.12源碼並編譯
  • 配置環境變量

1.1:安裝hive/cosole
下面是筆者安裝過程:
A:下載spark1.1.0源碼,安裝在/app/hadoop/spark110_sql目錄
B:下載hive0.12源碼,安裝在/app/hadoop/hive012目錄,進入src目錄後,使用下面命令進行編譯:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. ant clean package -Dhadoop.version=2.2.0 -Dhadoop-0.23.version=2.2.0 -Dhadoop.mr.rev=23  
C:配置環境變量文件~/.bashrc後,source ~/.bashrc使環境變量生效。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. export HIVE_HOME=/app/hadoop/hive012/src/build/dist  
  2. export HIVE_DEV_HOME=/app/hadoop/hive012/src  
  3. export HADOOP_HOME=/app/hadoop/hadoop220  
D:啓動
切換到spark安裝目錄/app/hadoop/spark110_sql,運行命令:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sbt/sbt hive/console  
通過一段漫長的sbt編譯過程,最後出現以下界面:

在控制檯的scala提示符下,輸入:help能夠獲取幫助,輸入Tab鍵會陳列出當前可用的方法、函數、及變量。下圖爲按Tab鍵時顯示的方法和函數,隨着用戶不斷使用該控制態,用戶定義或使用過的變量也會陳列出來。

 
1.2:hive/console原理
      hive/console的調試原理很簡單,就是在scala控制檯裝載了catalyst中幾個關鍵的class,其中的TestHive預約義了表結構並裝載命令,這些數據是hive0.12源碼中帶有的測試數據,裝載這些數據是按需執行的;這些數據位於/app/hadoop/hive012/src/data中,也就是$HIVE_DEV_HOME/data中。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1.  /*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */  
  2.   // The test tables that are defined in the Hive QTestUtil.  
  3.   // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java  
  4.   val hiveQTestUtilTables = Seq(  
  5.     TestTable("src",  
  6.       "CREATE TABLE src (key INT, value STRING)".cmd,  
  7.       s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),  
  8.     TestTable("src1",  
  9.       "CREATE TABLE src1 (key INT, value STRING)".cmd,  
  10.       s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),  
  11.     TestTable("srcpart", () => {  
  12.       runSqlHive(  
  13.         "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")  
  14.       for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {  
  15.         runSqlHive(  
  16.           s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'  
  17.              |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')  
  18.            """.stripMargin)  
  19.       }  
  20.     }),  
  21. ......  
  22. )  
由於要使用hive0.12的測試數據,因此須要定義兩個環境變量:HIVE_HOME和HIVE_DEV_HOME,若是使用hive0.13的話,用戶須要更改到相應目錄:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. /*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */  
  2.  /** The location of the compiled hive distribution */  
  3.  lazy val hiveHome = envVarToFile("HIVE_HOME")  
  4.  /** The location of the hive source code. */  
  5.  lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME")  
另外,若是用戶想在hive/console啓動的時候,預載更多的class,能夠修改spark源碼下的 project/SparkBuild.scala文件
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1.  /* 源自 project/SparkBuild.scala */  
  2. object Hive {  
  3.   lazy val settings = Seq(  
  4.     javaOptions += "-XX:MaxPermSize=1g",  
  5.     // Multiple queries rely on the TestHive singleton. See comments there for more details.  
  6.     parallelExecution in Test :false,  
  7.     // Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings  
  8.     // only for this subproject.  
  9.     scalacOptions <<= scalacOptions map { currentOpts: Seq[String] =>  
  10.       currentOpts.filterNot(_ == "-deprecation")  
  11.     },  
  12.     initialCommands in console :=  
  13.       """  
  14.         |import org.apache.spark.sql.catalyst.analysis._  
  15.         |import org.apache.spark.sql.catalyst.dsl._  
  16.         |import org.apache.spark.sql.catalyst.errors._  
  17.         |import org.apache.spark.sql.catalyst.expressions._  
  18.         |import org.apache.spark.sql.catalyst.plans.logical._  
  19.         |import org.apache.spark.sql.catalyst.rules._  
  20.         |import org.apache.spark.sql.catalyst.types._  
  21.         |import org.apache.spark.sql.catalyst.util._  
  22.         |import org.apache.spark.sql.execution  
  23.         |import org.apache.spark.sql.hive._  
  24.         |import org.apache.spark.sql.hive.test.TestHive._  
  25.         |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin  
  26.   )  
  27. }  

2:經常使用操做
      下面介紹一下hive/console的經常使用操做,主要是和運行計劃相關的經常使用操做。在操做前,首先定義一個表people和查詢query:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. //在控制檯逐行運行  
  2. case class Person(name:String, age:Int, state:String)  
  3. sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people")  
  4. val querysql("select * from people")  
2.1 查看查詢的schema
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. query.printSchema  


2.2 查看查詢的整個運行計劃
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. query.queryExecution  


2.3 查看查詢的Unresolved LogicalPlan
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. query.queryExecution.logical  


2.4 查看查詢的analyzed LogicalPlan
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. query.queryExecution.analyzed  


 2.5 查看優化後的LogicalPlan
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. query.queryExecution.optimizedPlan  


2.6 查看物理計劃
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. query.queryExecution.sparkPlan  


2.7 查看RDD的轉換過程
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. query.toDebugString  


2.8 更多的操做
      更多的操做能夠經過Tab鍵陳列出來,也能夠參開sparkSQL的API,也能夠參看源代碼中的方法和函數。

3:不一樣數據源的運行計劃
      上面經常使用操做裏介紹了源自RDD的數據,咱們都知道,sparkSQL能夠源自多個數據源:jsonFile、parquetFile、hive。下面看看這些數據源的schema:
3.1 json文件
      json文件支持嵌套表,sparkSQL也能夠讀入嵌套表,以下面形式的json數據,經修整(去空格和換行符)保存後,可使用jsonFile讀入sparkSQL。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. {    
  2.    "fullname": "Sean Kelly",       
  3.    "org": "SK Consulting",       
  4.    "emailaddrs": [       
  5.       {"type": "work", "value": "kelly@seankelly.biz"},       
  6.       {"type": "home", "pref": 1, "value": "kelly@seankelly.tv"}       
  7.    ],       
  8.     "telephones": [       
  9.       {"type": "work", "pref": 1, "value": "+1 214 555 1212"},       
  10.       {"type": "fax", "value": "+1 214 555 1213"},       
  11.       {"type": "mobile", "value": "+1 214 555 1214"}       
  12.    ],       
  13.    "addresses": [       
  14.       {"type": "work", "format": "us",       
  15.        "value": "1234 Main StnSpringfield, TX 78080-1216"},       
  16.       {"type": "home", "format": "us",       
  17.        "value": "5678 Main StnSpringfield, TX 78080-1316"}       
  18.    ],       
  19.     "urls": [       
  20.       {"type": "work", "value": "http://seankelly.biz/"},       
  21.       {"type": "home", "value": "http://seankelly.tv/"}       
  22.    ]       
  23. }  
去空格和換行符後保存爲/home/mmicky/data/nestjson.json,使用jsonFile讀入並註冊成表jsonPerson,而後定義一個查詢jsonQuery:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. jsonFile("/home/mmicky/data/nestjson.json").registerTempTable("jsonPerson")  
  2. val jsonQuery = sql("select * from jsonPerson")  
查看jsonQuery的schema:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. jsonQuery.printSchema  

查看jsonQuery的整個運行計劃:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. jsonQuery.queryExecution  


3.2 parquet文件
      parquet文件讀入並註冊成表parquetWiki,而後定義一個查詢parquetQuery:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. parquetFile("/home/mmicky/data/spark/wiki_parquet").registerTempTable("parquetWiki")  
  2. val parquetQuery = sql("select * from parquetWiki")  
查詢parquetQuery的schema:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. parquetQuery.printSchema  

查詢parquetQuery的整個運行計劃:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. parquetQuery.queryExecution  


3.3 hive數據
      以前說了,TestHive類中已經定義了大量的hive0.12的測試數據的表格式,如src、sales等等,在hive/console裏能夠直接使用;第一次使用的時候,hive/console會裝載一次。下面咱們使用sales表看看其schema和整個運行計劃。首先定義一個查詢hiveQuery:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. val hiveQuery = sql("select * from sales")  
查看hiveQuery的schema:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. hiveQuery.printSchema  

查看hiveQuery的整個運行計劃:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. hiveQuery.queryExecution  

從上面能夠看出,來自jsonFile、parquetFile、hive數據的物理計劃還有有很大區別的。

4:不一樣查詢的運行計劃
      爲了加深理解,咱們列幾個經常使用查詢的運行計劃和RDD轉換過程。
4.1 聚合查詢
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select state,avg(age) from people group by state").queryExecution  

[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select state,avg(age) from people group by state").toDebugString  

 
4.2 join操做
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution  

[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString  


4.3 Distinct操做
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution  

[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString  

 
5:查詢的優化
      上面的查詢比較簡單,看不出優化的過程,下面看幾個例子,能夠理解sparkSQL的優化過程。
5.1 CombineFilters
      CombineFilters就是合併Filter,在含有多個Filter時發生,以下查詢:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution  

上面的查詢,在Optimized的過程當中,將age>=19和age<30這兩個Filter合併了,合併成((age>=19) && (age<30))。其實上面還作了一個其餘的優化,就是project的下推,子查詢使用了表的全部列,而主查詢使用了列name,在查詢數據的時候子查詢優化成只查列name。

5.2 PushPredicateThroughProject
      PushPredicateThroughProject就是project下推,和上面例子中的project同樣。
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select name from (select name,state as location from people) a where location='CA'").queryExecution  

 
5.3 ConstantFolding
      ConstantFolding是常量疊加,用於表達式。以下面的例子:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. sql("select name,1+2 from people").queryExecution  

在Optimized的過程當中,將常量表達式直接累加在一塊兒,用新的列名來表示。

5.4 自定義優化
      在sparkSQL中的Optimizer中定義了3類12中優化方法,這裏再也不一一陳列。對於用於自定義的優化,在hive/console也能夠很方便的調試。只要先定義一個LogicalPlan,而後使用自定義的優化函數進行測試就能夠了。下面就舉個和CombineFilters同樣的例子,首先定義一個函數:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. object CombineFilters extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.     case Filter(c1, Filter(c2, grandChild)) =>  
  4.       Filter(And(c1,c2),grandChild)  
  5.   }  
  6. }  
而後定義一個query,並使用query.queryExecution.analyzed查看優化前的LogicPlan:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. val querysql("select * from people").where('age >=19).where('age <30)  
  2. query.queryExecution.analyzed  

最後,使用自定義優化函數進行優化:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. CombineFilters(query.queryExecution.analyzed)  

能夠看到兩個Filter合併在一塊兒了。
甚至,在hive/console裏直接使用transform對LogicPlan應用定義好的rule,下面定義了一個query,並使用query.queryExecution.analyzed查看應用rule前的LogicPlan:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. val hiveQuery = sql("SELECT * FROM (SELECT * FROM src) a")  
  2. hiveQuery.queryExecution.analyzed  

而後,直接用transform將自定義的rule:
[html]  view plain copy print ? 在CODE上查看代碼片 派生到個人代碼片
  1. hiveQuery.queryExecution.analyzed transform {  
  2.    case Project(projectList, child) if projectList == child.output => child  
  3.  }  

該transform在LogicPlan的主查詢和子查詢的project相同時合併project。

      通過上面的例子,加上本身的理解,相信大部分的讀者對sparkSQL中的運行計劃應該有了比較明確的瞭解。

五:測試環境之搭建

前面介紹了sparkSQL的運行架構,後面將介紹sparkSQL的使用。在介紹sparkSQL的使用以前,咱們須要搭建一個sparkSQL的測試環境。本次測試環境涉及到hadoop之HDFS、hive、spark以及相關的數據文件,相關的信息以下:

測試環境示意圖:

 
      本測試環境是在一臺物理機上搭建的,物理機的配置是16G內存,4核8線程CPU。hadoop一、hadoop二、hadoop3是vitual box虛擬機,構建hadoop集羣和spark集羣;物理機wyy做爲客戶端,編寫代碼和提交計算任務。總的測試環境配置以下:
 機器名  配置  角色  軟件安裝
 hadoop1  4G內存,1核  hadoop:NN/DN Spark:Master/worker /app/hadoop/hadoop220
/app/hadoop/spark110
/app/scala2104
/usr/java/jdk1.7.0_21
 hadoop2  4G內存,1核  hadoop:DN Spark:worker
 hive0.13客戶端
/app/hadoop/hadoop220
/app/hadoop/spark110
/app/hadoop/hive013
/app/scala2104
/usr/java/jdk1.7.0_21
 hadoop3  4G內存,1核  hadoop:DN Spark:worker
 hive0.13 metaserver service
 mysql server
/app/hadoop/hadoop220
/app/hadoop/spark100
/app/hadoop/hive013
/app/scala2104
/usr/java/jdk1.7.0_21
MySQL5.6.12
 wyy  16G內存,4核  client
 
hive0.13客戶端
/app/hadoop/hadoop220
/app/hadoop/spark110
/app/hadoop/hive013
以上hadoop220、spark、hive安裝目錄的用戶屬性都是hadoop(組別爲hadoop),其餘安裝目錄的用戶屬性是root:root。

      測試環境搭建順序
1:虛擬集羣的搭建(hadoop一、hadoop二、hadoop3)
A:hadoop2.2.0集羣搭建
或者參看視頻  http://pan.baidu.com/s/1qWqFY4c 提取密碼:xv4i

B:MySQL的安裝

C:hive的安裝
本測試中使用的hive0.13,和hive0.11的安裝同樣。
hive安裝在hadoop三、hadoop二、wyy。其中hadoop3啓動metastore serive;hadoop二、wyy配置uris後做爲hive的客戶端。

D:Spark1.1.0 Standalone集羣搭建
這裏須要注意的是,本測試中使用的是spark1.1.0,部署包生成命令make-distribution.sh的參數發生了變化,spark1.1.0的make-distribution.sh使用格式:
[html]  view plain copy print ?
  1. ./make-distribution.sh [--name] [--tgz] [--with-tachyon] <maven build options>  
參數的含義:
--with-tachyon:是否支持內存文件系統Tachyon,不加此參數時爲不支持。
--tgz:在根目錄下生成 spark-$VERSION-bin.tar.gz,不加此參數是不生成tgz文件,只生成/dist目錄。
--name NAME :和— tgz 結合能夠生成 spark-$VERSION-bin-$NAME.tgz 的部署包,不加此參數時 NAME 爲 hadoop 的版本號。
maven build options:使用maven編譯時可使用的配置選項,如使用-P、-D的選項
本次要生成基於hadoop2.2.0和yarn並集成hive、ganglia、asl的spark1.1.0部署包,可使用命令:
[html]  view plain copy print ?
  1. ./make-distribution.sh --tgz --name 2.2.0 -Pyarn -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl -Phive  
最後生成部署包spark-1.1.0-bin-2.2.0.tgz,按照測試環境的規劃進行安裝。

2:客戶端的搭建
      客戶端wyy採用的Ubuntu操做系統,而Spark虛擬集羣採用的是CentOS,默認的java安裝目錄兩個操做系統是不同的,因此在Ubuntu下安裝java的時候特地將java的安裝路徑改爲和CentOS同樣。否則的話,每次scp了虛擬集羣的配置文件以後,要修改hadoop、spark運行配置文件中的JAVA_HOME。
      客戶端hadoop2.2.0、Spark1.1.0、hive0.13是直接從虛擬集羣中scp出來的,放置在相同的目錄下,擁有相同的用戶屬性。開發工具使用的IntelliJ IDEA,程序編譯打包後複製到spark1.1.0的根目錄/app/hadoop/spark110下,使用spark-submit提交虛擬機集羣運行。

3:文件數據準備工做
      啓動hadoop2.2.0(只須要HDFS啓動就能夠了),而後將數據文件上傳到對應的目錄:

  • people.txt和people.json做爲第六節sparkSQL之基礎應用實驗數據;
  • graphx-wiki-vertices.txt和graphx-wiki-edges.txt做爲第八節sparkSQL之綜合應用中圖處理數據;
  • SogouQ.full.txt來源於Sogou實驗室,下載地址:http://download.labs.sogou.com/dl/q.html 完整版(2GB):gz格式,做爲第九節sparkSQL之調優的測試數據
 
4:hive數據準備工做
      在hive裏定義一個數據庫saledata,和三個表tblDate、tblStock、tblStockDetail,並裝載數據,具體命令:
[html]  view plain copy print ?
  1. CREATE DATABASE SALEDATA;  
  2. use SALEDATA;  
  3.   
  4. //Date.txt文件定義了日期的分類,將天天分別賦予所屬的月份、星期、季度等屬性  
  5. //日期,年月,年,月,日,周幾,第幾周,季度,旬、半月  
  6. CREATE TABLE tblDate(dateID string,theyearmonth string,theyear string,themonth string,thedate string,theweek string,theweeks string,thequot string,thetenday string,thehalfmonth string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ;  
  7.   
  8. //Stock.txt文件定義了訂單表頭  
  9. //訂單號,交易位置,交易日期  
  10. CREATE TABLE tblStock(ordernumber string,locationid string,dateID string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ;  
  11.   
  12. //StockDetail.txt文件定義了訂單明細  
  13. //訂單號,行號,貨品,數量,金額  
  14. CREATE TABLE tblStockDetail(ordernumber STRING,rownum int,itemid string,qty int,price int,amount int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ;  
  15.   
  16. //裝載數據  
  17. LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/sparkSQL/data/Date.txt' INTO TABLE tblDate;  
  18.   
  19. LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/sparkSQL/data/Stock.txt' INTO TABLE tblStock;  
  20.   
  21. LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/sparkSQL/data/StockDetail.txt' INTO TABLE tblStockDetail;  
最終在HDFS能夠看到相關的數據:
SparkSQL引入了一種新的RDD——SchemaRDD,SchemaRDD由行對象(row)以及描述行對象中每列數據類型的schema組成;SchemaRDD很象傳統數據庫中的表。SchemaRDD能夠經過RDD、Parquet文件、JSON文件、或者經過使用hiveql查詢hive數據來創建。SchemaRDD除了能夠和RDD同樣操做外,還能夠經過registerTempTable註冊成臨時表,而後經過SQL語句進行操做。
      值得注意的是:
  • Spark1.1使用registerTempTable代替1.0版本的registerAsTable
  • Spark1.1在hiveContext中,hql()將被棄用,sql()將代替hql()來提交查詢語句,統一了接口。
  • 使用registerTempTable註冊表是一個臨時表,生命週期只在所定義的sqlContext或hiveContext實例之中。換而言之,在一個sqlontext(或hiveContext)中registerTempTable的表不能在另外一個sqlContext(或hiveContext)中使用。
      另外,spark1.1提供了語法解析器選項spark.sql.dialect,就目前而言,spark1.1提供了兩種語法解析器:sql語法解析器和hiveql語法解析器。
  • sqlContext如今只支持sql語法解析器(SQL-92語法)
  • hiveContext如今支持sql語法解析器和hivesql語法解析器,默認爲hivesql語法解析器,用戶能夠經過配置切換成sql語法解析器,來運行hiveql不支持的語法,如select 1。
切換能夠經過下列方式完成:
  • 在sqlContexet中使用setconf配置spark.sql.dialect 
  • 在hiveContexet中使用setconf配置spark.sql.dialect
  • 在sql命令中使用 set spark.sql.dialect=value

 
      sparkSQL1.1對數據的查詢分紅了2個分支:sqlContext 和 hiveContext。至於二者之間的關係,hiveSQL繼承了sqlContext,因此擁有sqlontext的特性以外,還擁有自身的特性(最大的特性就是支持hive, sparkSQL1.1入門之六:sparkSQL之基礎應用 - mmicky - mmicky 的博客)。      
      下面就sparkSQL的一些基本操做作一演示:
  • sqlContext基礎應用
    • RDD
    • parquet文件
    • json文件
  • hiveContext基礎應用
  • 混合使用
  • 緩存之使用
  • DSL之使用
      爲了方便演示,咱們在spark-shell裏面進行下列演示,並加以說明。首先,啓動spark集羣,而後在客戶端wyy上啓動spark-shell:
[html]  view plain copy print ?
  1. bin/spark-shell --master spark://hadoop1:7077 --executor-memory 3g  

1:sqlContext基礎應用
      首先建立sqlContext,並引入 sqlContext.createSchemaRDD以完成RDD隱式轉換成SchemaRDD
[html]  view plain copy print ?
  1. val sqlContextnew org.apache.spark.sql.SQLContext(sc)  
  2. import sqlContext.createSchemaRDD  

1.1:RDD
      Spark1.1.0開始提供了兩種方式將RDD轉換成SchemaRDD:
  • 經過定義case class,使用反射推斷Schema(case class方式)
  • 經過可編程接口,定義Schema,並應用到RDD上(applySchema 方式)
      前者使用簡單、代碼簡潔,適用於已知Schema的源數據上;後者使用較爲複雜,但能夠在程序運行過程當中實行,適用於未知Schema的RDD上。

1.1.1 case class方式
      對於case class方式,首先要定義case class,在RDD的transform過程當中使用case class能夠隱式轉化成SchemaRDD,而後再使用registerTempTable註冊成表。註冊成表後就能夠在sqlContext對錶進行操做,如select 、insert、join等。注意,case class能夠是嵌套的,也可使用相似Sequences 或 Arrays之類複雜的數據類型。
      下面的例子是定義一個符合數據文件/sparksql/people.txt類型的case clase(Person),而後將數據文件讀入後隱式轉換成SchemaRDD:people,並將people在sqlContext中註冊成表rddTable,最後對錶進行查詢,找出年紀在13-19歲之間的人名。

/sparksql/people.txt的內容有3行:

 運行下列代碼:
[html]  view plain copy print ?
  1. //RDD1演示  
  2. case class Person(name:String,age:Int)  
  3. val rddpeople=sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))  
  4. rddpeople.registerTempTable("rddTable")  
  5.   
  6. sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)  
運行結果:

 
1.1.2 applySchema 方式
      applySchema 方式比較複雜,一般有3步過程:
  • 從源RDD建立rowRDD
  • 建立與rowRDD匹配的Schema
  • 將Schema經過applySchema應用到rowRDD
上面的例子經過applySchema 方式實現的代碼以下:
[html]  view plain copy print ?
  1. //RDD2演示  
  2. //導入SparkSQL的數據類型和Row  
  3. import org.apache.spark.sql._  
  4.   
  5. //建立於數據結構匹配的schema  
  6. val schemaString = "name age"  
  7. val schema =  
  8.   StructType(  
  9.     schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))  
  10.   
  11. //建立rowRDD  
  12. val rowRDD = sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim))  
  13. //用applySchema將schema應用到rowRDD  
  14. val rddpeople2 = sqlContext.applySchema(rowRDD, schema)  
  15.   
  16. rddpeople2.registerTempTable("rddTable2")  
  17. sqlContext.sql("SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)  
運行結果:

 
1.2:parquet文件
      一樣得,sqlContext能夠讀取parquet文件,因爲parquet文件中保留了schema的信息,因此不須要使用case class來隱式轉換。sqlContext讀入parquet文件後直接轉換成SchemaRDD,也能夠將SchemaRDD保存成parquet文件格式。

咱們先將上面創建的SchemaRDD:people保存成parquet文件:
[html]  view plain copy print ?
  1. rddpeople.saveAsParquetFile("/sparksql/people.parquet")  
運行後/sparksql/目錄下就多出了一個名稱爲people.parquet的目錄:

而後,將people.parquet讀入,註冊成表parquetTable,查詢年紀大於25歲的人名:
[html]  view plain copy print ?
  1. //parquet演示  
  2. val parquetpeople = sqlContext.parquetFile("/sparksql/people.parquet")  
  3. parquetpeople.registerTempTable("parquetTable")  
  4.   
  5. sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)  
運行結果:

 
1.3:json文件
      sparkSQL1.1.0開始提供對json文件格式的支持,這意味着開發者可使用更多的數據源,如鼎鼎大名的NOSQL數據庫MongDB等。sqlContext能夠從jsonFile或jsonRDD獲取schema信息,來構建SchemaRDD,註冊成表後就可使用。
  • jsonFile - 加載JSON文件目錄中的數據,文件的每一行是一個JSON對象。 
  • jsonRdd - 從現有的RDD加載數據,其中RDD的每一個元素包含一個JSON對象的字符串。
      下面的例子讀入一個json文件/sparksql/people.json,註冊成jsonTable,並查詢年紀大於25歲的人名。

/sparksql/people.json的內容:
 
運行下面代碼:
[html]  view plain copy print ?
  1. //json演示  
  2. val jsonpeople = sqlContext.jsonFile("/sparksql/people.json")  
  3. jsonpeople.registerTempTable("jsonTable")  
  4.   
  5. sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)  
運行結果:


2:hiveContext基礎應用
      使用hiveContext以前首先要確認如下兩點:
  • 使用的Spark是支持hive
  • hive的配置文件hive-site.xml已經存在conf目錄中
      前者能夠查看lib目錄下是否存在以datanucleus開頭的3個JAR來肯定,後者注意是否在hive-site.xml裏配置了uris來訪問hive metastore。

要使用hiveContext,須要先構建hiveContext:
[html]  view plain copy print ?
  1. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)  
而後就能夠對hive數據進行操做了,下面咱們將使用hive中的銷售數據(第五小結中的hive數據),首先切換數據庫到saledata並查看有幾個表:
[html]  view plain copy print ?
  1. hiveContext.sql("use saledata")  
  2. hiveContext.sql("show tables").collect().foreach(println)  
能夠看到有在第五小節定義的3個表:

 
如今查詢一下全部訂單中每一年的銷售單數、銷售總額:
[html]  view plain copy print ?
  1. //全部訂單中每一年的銷售單數、銷售總額  
  2. //三個錶鏈接後以count(distinct a.ordernumber)計銷售單數,sum(b.amount)計銷售總額  
  3. hiveContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)  
運行結果:

 
再作一個稍微複雜點的查詢,求出全部訂單每一年最大金額訂單的銷售額:
[html]  view plain copy print ?
  1. /************************  
  2. 全部訂單每一年最大金額訂單的銷售額:  
  3. 第一步,先求出每份訂單的銷售額以其發生時間  
  4. select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber   
  5. 第二步,以第一步的查詢做爲子表,和表tblDate鏈接,求出每一年最大金額訂單的銷售額  
  6. select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear  
  7. *************************/  
  8.   
  9. hiveContext.sql("select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)  
運行結果:

 
最後作一個更復雜的查詢,求出全部訂單中每一年最暢銷貨品:
[html]  view plain copy print ?
  1. /************************  
  2. 全部訂單中每一年最暢銷貨品:  
  3. 第一步:求出每一年每一個貨品的銷售金額  
  4. select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid  
  5.   
  6. 第二步:求出每一年單品銷售的最大金額  
  7. select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear  
  8.   
  9. 第三步:求出每一年與銷售額最大相符的貨品就是最暢銷貨品  
  10. select distinct  e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear  
  11. *************************/  
  12.   
  13. hiveContext.sql("select distinct  e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear").collect().foreach(println)  
運行結果:

 
3:混合使用
      在sqlContext或hiveContext中,來源於不一樣數據源的表在各自生命週期中能夠混用,可是不一樣實例之間的表不能混合使用。

3.1 sqlContext中混合使用:
[html]  view plain copy print ?
  1. //sqlContext中混合使用  
  2. //sqlContext中來自rdd的表rddTable和來自parquet文件的表parquetTable混合使用  
  3. sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b on a.name=b.name").collect().foreach(println)  
運行結果:

  
3.2 hiveContext中混合使用:
[html]  view plain copy print ?
  1. //hiveContext中混合使用  
  2. //建立一個hiveTable,並將數據加載,注意people.txt第二列有空格,因此age取string類型  
  3. hiveContext.sql("CREATE TABLE hiveTable(name string,age string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ")  
  4. hiveContext.sql("LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/sparkSQL/data/people.txt' INTO TABLE hiveTable")  
  5.   
  6. //建立一個源自parquet文件的表parquetTable2,而後和hiveTable混合使用  
  7. hiveContext.parquetFile("/sparksql/people.parquet").registerAsTable("parquetTable2")  
  8. hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b on a.name=b.name").collect().foreach(println)  
運行結果:

 
4:緩存之使用
      sparkSQL的cache可使用兩種方法來實現:
  • cacheTable()方法
  • CACHE TABLE命令
      千萬不要先使用cache SchemaRDD,而後registerAsTable ;使用RDD的cache()將使用原生態的cache,而不是針對SQL優化後的內存列存儲。看看cacheTable的源代碼:

 在默認的狀況下,內存列存儲的壓縮功能是關閉的,要使用壓縮功能須要配置變量COMPRESS_CACHED。

 在sqlContext裏能夠以下使用cache:
[html]  view plain copy print ?
  1. //sqlContext的cache使用  
  2. sqlContext.cacheTable("rddTable")  
  3. sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)  
  4.   
  5. sqlContext.sql("CACHE TABLE parquetTable")  
  6. sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)  
觀察webUI,能夠看到cache的信息。(注意cache是lazy的,要有action纔會實現;uncache是eager的,能夠當即實現)

 使用以下命令能夠取消cache:
[html]  view plain copy print ?
  1. sqlContext.uncacheTable("rddTable")  
  2. sqlContext.sql("UNCACHE TABLE parquetTable")  
一樣的,在hiveContext也可使用上面的方法cache或uncache(hiveContext繼承於sqlContext)。

5:DSL之使用
      sparkSQL除了支持HiveQL和SQL-92語法外,還支持DSL(Domain Specific Language)。在DSL中,使用scala符號'+標示符表示基礎表中的列,spark的execution engine會將這些標示符隱式轉換成表達式。另外能夠在API中找到不少DSL相關的方法,如where()、select()、limit()等等,詳細資料能夠查看catalyst模塊中的dsl子模塊,下面爲其中定義幾種經常使用方法:

 
關於DSL的使用,隨便舉個例子,結合DSL方法,很容易上手:
[html]  view plain copy print ?
  1. //DSL演示  
  2. val teenagers_dsl = rddpeople.where('age >= 10).where('age <= 19).select('name)  
  3. teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)  

6:Tips
      上面介紹了sparkSQL的基礎應用,sparkSQL還在高速發展中,存在者很多缺陷,如:
  • scala2.10.4自己對case class有22列的限制,在使用RDD數據源的時候就會形成不方便;
  • sqlContext中3個表不能同時join,須要兩兩join後再join一次;
  • sqlContext中不能直接使用values插入數據;
  • 。。。
      總的來講,hiveContext仍是使人滿意,sqlContext就有些差強人意了。另外,順便提一句,在編寫sqlContext應用程序的時候,case class要定義在object以外。

七:ThriftServer和CLI

spark1.1相較於spark1.0,最大的差異就在於spark1.1增長了萬人期待的CLI和ThriftServer。使得hive用戶還有用慣了命令行的RDBMS數據庫管理員很容易地上手sparkSQL,在真正意義上進入了SQL時代。下面先簡單介紹其使用,限於時間關係,之後再附上源碼分析。

1:使人驚訝的CLI
      剛部署好spark1.1就火燒眉毛地先測試CLI(bin/spark-sql),對於習慣了sql命令行的本人,失去了shark後,對於sparkSQL1.0一度非常抵觸(其實對於開發調試人員來講,spark-shell纔是利器,能夠很方便地使用各個spark生態中的組件)。急切中,沒有關閉hive metastore服務,而後一個bin/spark-sql就進入了命令行,而後經過hive metastore就能夠直接對hive進行查詢了:
[html]  view plain copy print ?
  1. spark-sql> use saledata;  
  2. //全部訂單中每一年的銷售單數、銷售總額  
  3. spark-sql> select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear order by c.theyear;  
運行結果:

順便地測試了一下hive0.13的語法(測試系統中使用的是hive0.13,spark1.1編譯的時候是hive0.12,毫無心外地,在CLI裏是不能使用hive0.13的語法,必須使用和spark匹配的hive版本的hive語法)。

1.1 CLI配置
      在使用CLI前,要先啓動hive metastore;而CLI的配置很是簡單,在conf/hive-site.xml中之須要指定hive metastore的uris就可使用了。如今要在客戶端wyy上使用spark-sql,配置conf/hive-site.xml以下:
[html]  view plain copy print ?
  1. <configuration>  
  2. <property>  
  3. <name>hive.metastore.uris</name>  
  4. <value>thrift://hadoop3:9083</value>  
  5. <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>  
  6. </property>  
  7. </configuration>  

1.2 CLI命令參數
經過 bin/spark-sql --help能夠查看CLI命令參數:
[html]  view plain copy print ?
  1. [hadoop@hadoop3 spark110]$ bin/spark-sql --help  
  2. Usage: ./bin/spark-sql [options] [cli option]  
  3. CLI options:  
  4.  -d,--define <keykey=value>          Variable subsitution to apply to hive  
  5.                                   commands. e.g. -d A=B or --define A=B  
  6.     --database <databasename>     Specify the database to use  
  7.  -e <quoted-query-string>         SQL from command line  
  8.  -f <filename>                    SQL from files  
  9.  -h <hostname>                    connecting to Hive Server on remote host  
  10.     --hiveconf <propertyproperty=value>   Use value for given property  
  11.     --hivevar <keykey=value>         Variable subsitution to apply to hive  
  12.                                   commands. e.g. --hivevar A=B  
  13.  -i <filename>                    Initialization SQL file  
  14.  -p <port>                        connecting to Hive Server on port number  
  15.  -S,--silent                      Silent mode in interactive shell  
  16.  -v,--verbose                     Verbose mode (echo executed SQL to the  
  17.                                   console)  
其中[options] 是CLI啓動一個SparkSQL應用程序的參數,若是不設置--master的話,將在啓動spark-sql的機器以local方式運行,只能經過http://機器名:4040進行監控;這部分參數,能夠參照 Spark1.0.0 應用程序部署工具spark-submit 的參數。
[cli option]是CLI的參數,經過這些參數,CLI能夠直接運行SQL文件、進入命令行運行SQL命令等等,相似之前的shark的用法。須要注意的是CLI不是使用JDBC鏈接,因此不能鏈接到ThriftServer;但能夠配置conf/hive-site.xml鏈接到hive的metastore,而後對hive數據進行查詢。

1.3 CLI使用
啓動spark-sql:
[html]  view plain copy print ?
  1. bin/spark-sql --master spark://hadoop1:7077 --executor-memory 3g  
在集羣監控頁面能夠看到啓動了SparkSQL應用程序:

這時就可使用HQL語句對hive數據進行查詢,另外,可使用COMMAND,如使用set進行設置參數:默認狀況下,sparkSQL shuffle的時候是200個partition,可使用以下命令修改這個參數:
[html]  view plain copy print ?
  1. SET spark.sql.shuffle.partitions=20;  
運行同一個查詢語句,參數改變後,Task(partition)的數量就由200變成了20。

基本上,在CLI可使用絕大多數的hive特性。 

2:ThriftServer
      ThriftServer是一個JDBC/ODBC接口,用戶能夠經過JDBC/ODBC鏈接ThriftServer來訪問SparkSQL的數據。ThriftServer在啓動的時候,會啓動了一個sparkSQL的應用程序,而經過JDBC/ODBC鏈接進來的客戶端共同分享這個sparkSQL應用程序的資源,也就是說不一樣的用戶之間能夠共享數據;ThriftServer啓動時還開啓一個偵聽器,等待JDBC客戶端的鏈接和提交查詢。因此,在配置ThriftServer的時候,至少要配置ThriftServer的主機名和端口,若是要使用hive數據的話,還要提供hive metastore的uris。

2.1 ThriftServer配置
      一般,ThriftServer能夠在conf/hive-site.xml中定義如下幾項配置,也可使用環境變量的方式進行配置(環境變量的優先級高於hive-site.xml)。
      下面是在實驗集羣中hadoop2上啓動ThriftServer的hive-site.xml配置:
[html]  view plain copy print ?
  1. <configuration>  
  2. <property>  
  3. <name>hive.metastore.uris</name>  
  4. <value>thrift://hadoop3:9083</value>  
  5. <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>  
  6. </property>  
  7.   
  8. <property>  
  9. <name>hive.server2.thrift.min.worker.threads</name>  
  10. <value>5</value>  
  11. <description>Minimum number of Thrift worker threads</description>  
  12. </property>  
  13.   
  14. <property>  
  15. <name>hive.server2.thrift.max.worker.threads</name>  
  16. <value>500</value>  
  17. <description>Maximum number of Thrift worker threads</description>  
  18. </property>  
  19.   
  20. <property>  
  21. <name>hive.server2.thrift.port</name>  
  22. <value>10000</value>  
  23. <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>  
  24. </property>  
  25.   
  26. <property>  
  27. <name>hive.server2.thrift.bind.host</name>  
  28. <value>hadoop2</value>  
  29. <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>  
  30. </property>  
  31. </configuration>  

2.2 ThriftServer命令參數
使用 sbin/start-thriftserver.sh --help能夠查看ThriftServer的命令參數:
[html]  view plain copy print ?
  1. [hadoop@hadoop3 spark110]$ sbin/start-thriftserver.sh --help  
  2. Usage: ./sbin/start-thriftserver [options] [thrift server options]  
  3. Thrift server options:  
  4.       Use value for given property  
其中[options] 是ThriftServer啓動一個SparkSQL應用程序的參數,若是不設置--master的話,將在啓動ThriftServer的機器以local方式運行,只能經過http://機器名:4040進行監控;這部分參數,能夠參照 Spark1.0.0 應用程序部署工具spark-submit 的參數。在集羣中提供ThriftServer的話,必定要配置master、executor-memory等參數。
[thrift server options]是ThriftServer的參數,可使用-d property=value的格式來定義;在實際應用上,由於參數比較多,一般使用conf/hive-site.xml配置。

2.3 ThriftServer使用
2.3.1 啓動ThriftServer
啓動ThriftServer,使之運行在spark集羣中:
[html]  view plain copy print ?
  1. sbin/start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 3g  
在集羣監控頁面能夠看到啓動了SparkSQL應用程序:

注意不要將hive.server2.thrift.bind.host配置能localhost,否則遠程客戶端不能鏈接。

2.3.2 遠程客戶端鏈接
切換到客戶端wyy,啓動bin/beeline,用!connect jdbc:hive2://hadoop2:10000 鏈接ThriftServer,由於沒有采用權限管理,因此用戶名用運行bin/beeline的用戶,密碼爲空:

而後,對tblstock進行下面操做:
  1. 切換數據庫saledata;
  2. cache table tblstock;
  3. 對tblstock計數;

由於首次操做,因此統計花了11.725秒,查看webUI,tblStock已經緩存:

而後啓動另一個遠程鏈接,切換到hadoop1, 啓動bin/beeline,用!connect jdbc:hive2://hadoop2:10000 鏈接ThriftServer,而後直接運行對tblstock計數(注意沒有進行數據庫的切換):

用時 0.664秒,再查看webUI中的stage:

Locality Level是PROCESS,顯然是使用了緩存表。
從上能夠看出,ThriftServer能夠鏈接多個JDBC/ODBC客戶端,並相互之間能夠共享數據。順便提一句,ThriftServer啓動後處於監聽狀態,用戶可使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。
 
2.3.3 代碼示例
      有了ThriftServer,開發人員能夠很是方便的使用JDBC/ODBC來訪問sparkSQL。下面是一個scala代碼,查詢表tblStockDetail ,返回amount>3000的單據號和交易金額:
[html]  view plain copy print ?
  1. package doc  
  2.   
  3. import java.sql.DriverManager  
  4.   
  5. object SQLJDBC {  
  6.   def main(args: Array[String]) {  
  7.     Class.forName("org.apache.hive.jdbc.HiveDriver")  
  8.     val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000", "hadoop", "")  
  9.     try {  
  10.       val statement = conn.createStatement  
  11.       val rs = statement.executeQuery("select ordernumber,amount from tblStockDetail where amount>3000")  
  12.       while (rs.next) {  
  13.         val ordernumber = rs.getString("ordernumber")  
  14.         val amount = rs.getString("amount")  
  15.         println("ordernumber = %s, amount = %s".format(ordernumber, amount))  
  16.       }  
  17.     } catch {  
  18.       case e: Exception => e.printStackTrace  
  19.     }  
  20.     conn.close  
  21.   }  
  22. }  
運行結果:

如需更詳細的信息,請參照: HiveServer2 Clients

3:小結
      總的來講,ThriftServer和CLI的引入,使得sparkSQL能夠更方便的使用hive數據,使得sparkSQL能夠更接近使用者,而非開發者。

八:sparkSQL之綜合應用

Spark之因此萬人矚目,除了內存計算,還有其ALL-IN-ONE的特性,實現了One stack rule them all。下面簡單模擬了幾個綜合應用場景,不只使用了sparkSQL,還使用了其餘Spark組件:
  • 店鋪分類,根據銷售額對店鋪分類,使用sparkSQL和MLLib
  • PageRank,計算最有價值的網頁,使用sparkSQL和GraphX
      前者將使用sparkSQL+MLlib的聚類算法,後者將使用sparkSQL+GraphX的PageRank算法。本實驗採用IntelliJ IDEA調試代碼,最後生成doc.jar,而後使用spark-submit提交給集羣運行。

1:店鋪分類
      分類在實際應用中很是廣泛,好比對客戶進行分類、對店鋪進行分類等等,對不一樣類別採起不一樣的策略,能夠有效的下降企業的營運成本、增長收入。機器學習中的聚類就是一種根據不一樣的特徵數據,結合用戶指定的類別數量,將數據分紅幾個類的方法。下面舉個簡單的例子,對第五小結中的hive數據,按照銷售數量和銷售金額這兩個特徵數據,進行聚類,分出3個等級的店鋪。
      在IDEA中創建一個object:SQLMLlib
[html]  view plain copy print ?
  1. package doc  
  2.   
  3. import org.apache.log4j.{Level, Logger}  
  4. import org.apache.spark.sql.catalyst.expressions.Row  
  5. import org.apache.spark.{SparkConf, SparkContext}  
  6. import org.apache.spark.sql.hive.HiveContext  
  7. import org.apache.spark.mllib.clustering.KMeans  
  8. import org.apache.spark.mllib.linalg.Vectors  
  9.   
  10. object SQLMLlib {  
  11.   def main(args: Array[String]) {  
  12.     //屏蔽沒必要要的日誌顯示在終端上  
  13.     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)  
  14.     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)  
  15.   
  16.     //設置運行環境  
  17.     val sparkConf = new SparkConf().setAppName("SQLMLlib")  
  18.     val sc = new SparkContext(sparkConf)  
  19.     val hiveContext = new HiveContext(sc)  
  20.   
  21.     //使用sparksql查出每一個店的銷售數量和金額  
  22.     hiveContext.sql("use saledata")  
  23.     hiveContext.sql("SET spark.sql.shuffle.partitions=20")  
  24.     val sqldata = hiveContext.sql("select a.locationid, sum(b.qty) totalqty,sum(b.amount) totalamount from tblStock a join tblstockdetail b on a.ordernumber=b.ordernumber group by a.locationid")  
  25.   
  26.     //將查詢數據轉換成向量  
  27.     val parsedData = sqldata.map {  
  28.       case Row(_, totalqty, totalamount) =>  
  29.         val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)  
  30.         Vectors.dense(features)  
  31.     }  
  32.   
  33.     //對數據集聚類,3個類,20次迭代,造成數據模型  
  34.     //注意這裏會使用設置的partition數20  
  35.     val numClusters = 3  
  36.     val numIterations = 20  
  37.     val model = KMeans.train(parsedData, numClusters, numIterations)  
  38.   
  39.     //用模型對讀入的數據進行分類,並輸出  
  40.     //因爲partition沒設置,輸出爲200個小文件,可使用bin/hdfs dfs -getmerge 合併下載到本地  
  41.     val result2 = sqldata.map {  
  42.       case Row(locationid, totalqty, totalamount) =>  
  43.         val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)  
  44.         val linevectore = Vectors.dense(features)  
  45.         val prediction = model.predict(linevectore)  
  46.         locationid + " " + totalqty + " " + totalamount + " " + prediction  
  47.     }.saveAsTextFile(args(0))  
  48.   
  49.     sc.stop()  
  50.   }  
  51. }  
編譯打包後,複製到spark安裝目錄下運行:
[html]  view plain copy print ?
  1. cp /home/mmicky/IdeaProjects/doc/out/artifacts/doc/doc.jar .  
  2. bin/spark-submit --master spark://hadoop1:7077 
相關文章
相關標籤/搜索