Spark SQL模塊,主要就是處理跟SQL解析相關的一些內容,說得更通俗點就是怎麼把一個SQL語句解析成Dataframe或者說RDD的任務。以Spark 2.4.3爲例,Spark SQL這個大模塊分爲三個子模塊,以下圖所示html
其中Catalyst能夠說是Spark內部專門用來解析SQL的一個框架,在Hive中相似的框架是Calcite(將SQL解析成MapReduce任務)。Catalyst將SQL解析任務分紅好幾個階段,這個在對應的論文中講述得比較清楚,本系列不少內容也會參考論文,有興趣閱讀原論文的能夠到這裏看:Spark SQL: Relational Data Processing in Spark。sql
而Core模塊其實就是Spark SQL主要解析的流程,固然這個過程當中會去調用Catalyst的一些內容。這模塊裏面比較經常使用的類包括SparkSession,DataSet等。shell
至於hive模塊,這個不用說,確定跟hive有關的。這個模塊在本系列基本不會涉及到,就很少介紹了。數據庫
值得一提的是,論文發表的時候仍是在Spark1.x階段,那個時候SQL解析成詞法樹用的是scala寫的一個解析工具,到2.x階段改成使用antlr4來作這部分工做(這應該算是最大的改變)。至於爲何要改,我猜是出於可讀性和易用性方面的考慮,固然這個僅是我的猜想。apache
另外,這一系列會簡單介紹一條SQL語句的處理流程,基於spark 2.4.3(sql這個模塊在spark2.1後變化不大)。這一篇先從總體介紹Spark SQL出現的背景及解決問題,Dataframe API以及Catalyst的流程大概是怎麼樣,後面分階段細說Catalyst的流程。編程
在最先的時候,大規模處理數據的技術是MapReduce,但這種框架執行效率太慢,進行一些關係型處理(如join)須要編寫大量代碼。後來hive這種框架可讓用戶輸入sql語句,自動進行優化並執行。json
但在大型系統中,任然有兩個主要問題,一個是ETL操做須要對接多個數據源。另外一個是用戶須要執行復雜分析,好比機器學習和圖計算等。但傳統的關係型處理系統中較難實現。api
Spark SQL提供了兩個子模塊來解決這個問題,DataFrame API和Catalyst。緩存
相比於RDD,Dataframe api提供了更加豐富的關係型api,而且能和RDD相互轉換,後面Spark機器學習方面的工做重心,也從以RDD爲基礎的mllib轉移到以Dataframe爲基礎的Spark ML(雖然Dataframe底層也是RDD)。session
另外一個就是Catalyst,經過它能夠輕鬆爲諸如機器學習之類的域添加數據源(好比json或經過case class自定義的類型),優化規則和數據類型。
經過這兩個模塊,Spark SQL主要實現如下目標:
那下面就介紹Dataframe和Catalyst的流程,固然主要討論的仍是Catalyst。
先來看看論文裏面提供的一張圖:
這張圖能夠說明不少,首先Spark的Dataframe API底層也是基於Spark的RDD。但與RDD不一樣的在於,Dataframe會持有schema(這個實在很差翻譯,能夠理解爲數據的結構吧),以及能夠執行各類各樣的關係型操做,好比Select,Filter,Join,Groupby等。從操做上來講,和pandas的Dataframe有點像(連名字都是同樣的)。
同時由於是基於RDD的,因此不少RDD的特性Dataframe都可以享受到,好比說分佈式計算中一致性,可靠性方面的保證,以及能夠經過cache緩存數據,提升計算性能啊等等。
同時圖中頁展現了Dataframe能夠經過JDBC連接外部數據庫,經過控制檯操做(spark-shell),或者用戶程序。說白了,就是Dataframe能夠經過RDD轉換而來,也能夠經過外部數據表生成。
對了,這裏順便說一句,不少初次接觸Spark SQL的童鞋可能會對Dataset和Dataframe這兩個東西感到疑惑,在1.x時代它們確實有些差異,不過在spark2.x的時候,這兩個API已經統一了。因此基本上Dataset和Dataframe能夠當作是等價的東西。
最後仍是結合代碼作一下實際的展現吧,以下展現生成一個RDD,而且根據這個RDD生成對應的Dataframe,從中能夠看出RDD和Dataframe的區別:
//生成RDD scala> val data = sc.parallelize(Array((1,2),(3,4))) data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> data.foreach(println) (1,2) (3,4) scala> val df = data.toDF("fir","sec") df: org.apache.spark.sql.DataFrame = [fir: int, sec: int] scala> df.show() +---+---+ |fir|sec| +---+---+ | 1| 2| | 3| 4| +---+---+ //跟RDD相比,多了schema scala> df.printSchema() root |-- fir: integer (nullable = false) |-- sec: integer (nullable = false)
Catalyst在論文中被叫作優化器(Optimizer),這部分是論文裏面較爲核心的內容,不過其實流程仍是蠻好理解的,依舊貼下論文裏面的圖。
主要流程大概能夠分爲如下幾步:
提早說一下吧,上述流程多數是在org.apache.spark.sql.execution.QueryExecution這個類裏面,這個貼一下簡單的代碼,看看就好,先很少作深究。後面的文章會詳細介紹這裏的內容。
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其餘代碼 //analyzer階段 lazy val analyzed: LogicalPlan = { SparkSession.setActiveSession(sparkSession) sparkSession.sessionState.analyzer.executeAndCheck(logical) } //optimizer階段 lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData) //SparkPlan階段 lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } //prepareForExecution階段 // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //execute階段 /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() ......其餘代碼 }
值得一提的是每一個階段都使用了lazy懶加載,對這塊感興趣能夠看看我以前的文章Scala函數式編程(六) 懶加載與Stream。
上述主要介紹Spark SQL模塊內容,其出現的背景以及主要解決問題。然後簡單介紹下Dataframe API的內容,以及Spark SQL解析SQL的內部框架Catalyst。後續主要會介紹Catalyst中各個步驟的流程,結合源碼來作一些分析。
以上~