很久沒更新博客了,以前學了一些R語言和機器學習的內容,作了一些筆記,以後也會放到博客上面來給你們共享。一個月前就打算更新Spark Sql的內容了,由於一些別的事情耽誤了,今天就簡單寫點,Spark1.2立刻就要出來了,不知道變更會不會很大,聽說添加了不少的新功能呢,期待中...html
首先聲明一下這個版本的代碼是1.1的,以前講的都是1.0的。sql
Spark支持兩種模式,一種是在spark裏面直接寫sql,能夠經過sql來查詢對象,相似.net的LINQ同樣,另一種支持hive的HQL。不論是哪一種方式,下面提到的步驟都會有,不一樣的是具體的執行過程。下面就說一下這個過程。數據庫
Sql解析成LogicPlanapache
使用Idea的快捷鍵Ctrl + Shift + N打開SQLQuerySuite文件,進行調試吧。api
def sql(sqlText: String): SchemaRDD = { if (dialect == "sql") { new SchemaRDD(this, parseSql(sqlText)) } else { sys.error(s"Unsupported SQL dialect: $dialect") } }
從這裏能夠看出來,第一步是解析sql,最後把它轉換成一個SchemaRDD。點擊進入parseSql函數,發現解析Sql的過程在SqlParser這個類裏面。
在SqlParser的apply方法裏面,咱們能夠看到else語句裏面的這段代碼。緩存
//對input進行解析,符合query的模式的就返回Success phrase(query)(new lexical.Scanner(input)) match { case Success(r, x) => r case x => sys.error(x.toString) }
這裏咱們主要關注query就能夠。app
protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache )
這裏面有不少看不懂的操做符,請到下面這個網址裏面去學習。這裏能夠看出來它目前支持的sql語句只是select和insert。機器學習
http://www.scala-lang.org/api/2.10.4/index.html#scala.util.parsing.combinator.Parsers$Parseride
咱們繼續查看select。函數
// ~>只保留右邊的模式 opt可選的 ~按順序合成 <~只保留左邊的 protected lazy val select: Parser[LogicalPlan] = SELECT ~> opt(DISTINCT) ~ projections ~ opt(from) ~ opt(filter) ~ opt(grouping) ~ opt(having) ~ opt(orderBy) ~ opt(limit) <~ opt(";") ^^ { case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l => val base = r.getOrElse(NoRelation) val withFilter = f.map(f => Filter(f, base)).getOrElse(base) val withProjection = g.map {g => Aggregate(assignAliases(g), assignAliases(p), withFilter) }.getOrElse(Project(assignAliases(p), withFilter)) val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection) val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct) val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving) val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder) withLimit }
能夠看得出來它對sql的解析是和咱們經常使用的sql寫法是一致的,這裏面再深刻下去還有遞歸,並非看起來那麼好理解。這裏就不繼續講下去了,在解析hive的時候我會重點講一下,我認爲目前你們使用得更可能是仍然是來源於hive的數據集,畢竟hive那麼穩定。
到這裏咱們能夠知道第一步是經過Parser把sql解析成一個LogicPlan。
LogicPlan到RDD的轉換過程
好,下面咱們回到剛纔的代碼,接着咱們應該看SchemaRDD。
override def compute(split: Partition, context: TaskContext): Iterator[Row] = firstParent[Row].compute(split, context).map(_.copy()) override def getPartitions: Array[Partition] = firstParent[Row].partitions override protected def getDependencies: Seq[Dependency[_]] = List(new OneToOneDependency(queryExecution.toRdd))
SchemaRDD是一個RDD的話,那麼它最重要的3個屬性:compute函數,分區,依賴全在這裏面,其它的函數咱們就不看了。
挺奇怪的是,咱們new出來的RDD,怎麼會有依賴呢,這個queryExecution是啥,點擊進去看看吧,代碼跳轉到SchemaRDD繼承的SchemaRDDLike裏面。
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
把這兩段很短的代碼都放一塊兒了,executePlan方法就是new了一個QueryExecution出來,那咱們繼續看看QueryExecution這個類吧。
lazy val analyzed = ExtractPythonUdfs(analyzer(logical)) lazy val optimizedPlan = optimizer(analyzed) lazy val sparkPlan = { SparkPlan.currentContext.set(self) planner(optimizedPlan).next() } // 在須要的時候加入Shuffle操做
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) lazy val toRdd: RDD[Row] = executedPlan.execute()
從這裏能夠看出來LogicPlan是通過了5個步驟的轉換,要被analyzer和optimizer的處理,而後轉換成SparkPlan,在執行以前還要被prepareForExecution處理一下,最後調用execute方法轉成RDD.
下面咱們分步講這些個東東究竟是幹啥了。
首先咱們看看Anayzer,它是繼承自RuleExecutor的,這裏插句題外話,Spark sql的做者Michael Armbrust在2013年的Spark Submit上介紹Catalyst的時候,就說到要從總體地去優化一個sql的執行是很困難的,全部設計成這種基於一個一個小規則的這種優化方式,既簡單又方便維護。
好,咱們接下來看看RuleExecutor的apply方法。
def apply(plan: TreeType): TreeType = { var curPlan = plan //規則還分批次的,分批對plan進行處理 batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { //用batch種的小規則從左到右挨個對plan進行處理 curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val result = rule(plan) result } iteration += 1 //超過了規定的迭代次數就要退出的 if (iteration > batch.strategy.maxIterations) { continue = false } //通過處理成功的plan是會發生改變的,若是和上一次處理接觸的plan同樣,這說明已經沒有優化空間了,能夠結束,這個就是前面提到的Fixed point if (curPlan.fastEquals(lastPlan)) { continue = false } lastPlan = curPlan } } curPlan }
看完了RuleExecutor,咱們繼續看Analyzer,下面我只貼出來batches這塊的代碼,剩下的要本身去看了哦。
val batches: Seq[Batch] = Seq( //碰到繼承自MultiInstanceRelations接口的LogicPlan時,發現id之後重複的,就強制要求它們生成一個新的全局惟一的id //涉及到InMemoryRelation、LogicRegion、ParquetRelation、SparkLogicPlan Batch("MultiInstanceRelations", Once, NewRelationInstances), //若是大小寫不敏感就把屬性都變成小寫 Batch("CaseInsensitiveAttributeReferences", Once, (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*), //這個牛逼啊,竟然想迭代100次的。 Batch("Resolution", fixedPoint, //解析從子節點的操做生成的屬性,通常是別名引發的,好比a.id ResolveReferences :: //經過catalog解析表名 ResolveRelations :: //在select語言裏,order by的屬性每每在前面沒寫,查詢的時候也須要把這些字段查出來,排序完畢以後再刪除 ResolveSortReferences :: //前面講過了 NewRelationInstances :: //清除被誤認爲別名的屬性,好比sum(score) as a,其實它應該是sum(score)纔對 //它被解析的時候解析成Project(Seq(Alias(g: Generator, _)),直接返回Generator就能夠了 ImplicitGenerate :: //處理語句中的*,好比select *, count(*) StarExpansion :: //解析函數 ResolveFunctions :: //解析全局的聚合函數,好比select sum(score) from table GlobalAggregates :: //解析having子句後面的聚合過濾條件,好比having sum(score) > 400 UnresolvedHavingClauseAttributes :: //typeCoercionRules是hive的類型轉換規則 typeCoercionRules :_*), //檢查全部節點的屬性是否都已經處理完畢了,若是還有沒解析出來的屬性,這裏就會報錯! Batch("Check Analysis", Once, CheckResolution), //清除多餘的操做符,如今是Subquery和LowerCaseSchema, //第一個是子查詢,第二個HiveContext查詢樹裏面把子節點所有轉換成小寫 Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) )
能夠看得出來Analyzer是把Unresolved的LogicPlan解析成resolved的,解析裏面的表名、字段、函數、別名什麼的。
咱們接着看Optimizer, 從單詞上看它是用來作優化的,可是從代碼上來看它更多的是爲了過濾咱們寫的一些垃圾語句,並無作什麼實際的優化。
object Optimizer extends RuleExecutor[LogicalPlan] { val batches = //遞歸合併相鄰的兩個limit Batch("Combine Limits", FixedPoint(100), CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), //替換null值 NullPropagation, //替換一些簡單的常量表達式,好比 1 in (1,2) 直接返回一個true就能夠了 ConstantFolding, //簡化like語句,避免全表掃描,目前支持'%demo%', '%demo','demo*','demo' LikeSimplification, //簡化過濾條件,好比true and score > 0 直接替換成score > 0 BooleanSimplification, //簡化filter,好比where 1=1 或者where 1=2,前者直接去掉這個過濾,後者這個查詢就不必作了 SimplifyFilters, //簡化轉換,好比兩個比較字段的數據類型是同樣的,就不須要轉換了 SimplifyCasts, //簡化大小寫轉換,好比Upper(Upper('a'))轉爲認爲是Upper('a') SimplifyCaseConversionExpressions) :: Batch("Filter Pushdown", FixedPoint(100), //遞歸合併相鄰的兩個過濾條件 CombineFilters, //把從表達式裏面的過濾替換成,先作過濾再取表達式,而且掉過濾裏面的別名屬性 //典型的例子 select * from (select a,b from table) where a=1 //替換成select * from (select a,b from table where a=1) PushPredicateThroughProject, //把join的on條件中能夠在原表當中作過濾的先作過濾 //好比select a,b from x join y on x.id = y.id and x.a >0 and y.b >0 //這個語句能夠改寫爲 select a,b from x where x.a > 0 join (select * from y where y.b >0) on x.id = y.id PushPredicateThroughJoin, //去掉一些用不上的列 ColumnPruning) :: Nil }
真是用心良苦啊,看來咱們寫sql的時候仍是要注意一點的,你看人家花多大的功夫來優化咱們的爛sql。。。要是我確定不優化。。。寫得爛就慢去吧!
接下來,就改看這一句了planner(optimizedPlan).next() 咱們先看看SparkPlanner吧。
protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext val sqlContext: SQLContext = self def codegenEnabled = self.codegenEnabled def numPartitions = self.numShufflePartitions //把LogicPlan轉換成實際的操做,具體操做類在org.apache.spark.sql.execution包下面 val strategies: Seq[Strategy] = //把cache、set、expain命令轉化爲實際的Command CommandStrategy(self) :: //把limit轉換成TakeOrdered操做 TakeOrdered :: //名字有點蠱惑人,就是轉換聚合操做 HashAggregation :: //left semi join只顯示鏈接條件成立的時候鏈接左邊的表的信息 //好比select * from table1 left semi join table2 on(table1.student_no=table2.student_no); //它只顯示table1中student_no在表二當中的信息,它能夠用來替換exist語句 LeftSemiJoin :: //等值鏈接操做,有些優化的內容,若是表的大小小於spark.sql.autoBroadcastJoinThreshold設置的字節 //就自動轉換爲BroadcastHashJoin,即把表緩存,相似hive的map join(順序是先判斷右表再判斷右表)。 //這個參數的默認值是10000 //另外作內鏈接的時候還會判斷左表右表的大小,shuffle取數據大表不動,從小表拉取數據過來計算 HashJoin :: //在內存裏面執行select語句進行過濾,會作緩存 InMemoryScans :: //和parquet相關的操做 ParquetOperations :: //基本的操做 BasicOperators :: //沒有條件的鏈接或者內鏈接作笛卡爾積 CartesianProduct :: //把NestedLoop鏈接進行廣播鏈接 BroadcastNestedLoopJoin :: Nil ...... }
這一步是把邏輯計劃轉換成物理計劃,或者說是執行計劃了,裏面有不少概念是我之前沒聽過的,網上查了一下才知道,原來數據庫的執行計劃還有那麼多的說法,這一塊須要是專門研究數據庫的人比較瞭解了。剩下的兩步就是prepareForExecution和execute操做。
prepareForExecution操做是檢查物理計劃當中的Distribution是否知足Partitioning的要求,若是不知足的話,須要從新弄作分區,添加shuffle操做,這塊暫時沒咋看懂,之後還須要仔細研究。最後調用SparkPlan的execute方法,這裏面稍微講講這塊的樹型結構。
sql解析出來就是一個二叉樹的結構,不論是邏輯計劃仍是物理計劃,都是這種結構,因此在代碼裏面能夠看到LogicPlan和SparkPlan的具體實現類都是有繼承上面圖中的三種類型的節點的。
非LeafNode的SparkPlan的execute方法都會有這麼一句child.execute(),由於它須要先執行子節點的execute來返回數據,執行的過程是一個先序遍歷。
最後把這個過程也用一個圖來表示吧,方便記憶。
(1)經過一個Parser來把sql語句轉換成Unresolved LogicPlan,目前有兩種Parser,SqlParser和HiveQl。
(2)經過Analyzer把LogicPlan當中的Unresolved的內容給解析成resolved的,這裏麪包括表名、函數、字段、別名等。
(3)經過Optimizer過濾掉一些垃圾的sql語句。
(4)經過Strategies把邏輯計劃轉換成能夠具體執行的物理計劃,具體的類有SparkStrategies和HiveStrategies。
(5)在執行前用prepareForExecution方法先檢查一下。
(6)先序遍歷,調用執行計劃樹的execute方法。
岑玉海
轉載請註明出處,謝謝!