Hive是基於Hadoop的一個數據倉庫系統,在各大公司都有普遍的應用。美團數據倉庫也是基於Hive搭建,天天執行近萬次的Hive ETL計算流程,負責天天數百GB的數據存儲和分析。Hive的穩定性和性能對咱們的數據分析很是關鍵。python
在幾回升級Hive的過程當中,咱們遇到了一些大大小小的問題。經過向社區的諮詢和本身的努力,在解決這些問題的同時咱們對Hive將SQL編譯爲MapReduce的過程有了比較深刻的理解。對這一過程的理解不只幫助咱們解決了一些Hive的bug,也有利於咱們優化Hive SQL,提高咱們對Hive的掌控力,同時有能力去定製一些須要的功能。sql
詳細講解SQL編譯爲MapReduce以前,咱們先來看看MapReduce框架實現SQL基本操做的原理apache
select u.name, o.orderid from order o join user u on o.uid = u.uid;
在map的輸出value中爲不一樣表的數據打上tag標記,在reduce階段根據tag判斷數據來源。MapReduce的過程以下(這裏只是說明最基本的Join的實現,還有其餘的實現方式)數組
select rank, isonline, count(*) from city group by rank, isonline;
將GroupBy的字段組合爲map的輸出key值,利用MapReduce的排序,在reduce階段保存LastKey區分不一樣的key。MapReduce的過程以下(固然這裏只是說明Reduce端的非Hash聚合過程)緩存
select dealid, count(distinct uid) num from order group by dealid;
當只有一個distinct字段時,若是不考慮Map階段的Hash GroupBy,只須要將GroupBy字段和Distinct字段組合爲map輸出key,利用mapreduce的排序,同時將GroupBy字段做爲reduce的key,在reduce階段保存LastKey便可完成去重app
若是有多個distinct字段呢,以下面的SQL框架
select dealid, count(distinct uid), count(distinct date) from order group by dealid;
實現方式有兩種:編輯器
(1)若是仍然按照上面一個distinct字段的方法,即下圖這種實現方式,沒法跟據uid和date分別排序,也就沒法經過LastKey去重,仍然須要在reduce階段在內存中經過Hash去重ide
(2)第二種實現方式,能夠對全部的distinct字段編號,每行數據生成n行數據,那麼相同字段就會分別排序,這時只須要在reduce階段記錄LastKey便可去重。工具
這種實現方式很好的利用了MapReduce的排序,節省了reduce階段去重的內存消耗,可是缺點是增長了shuffle的數據量。
須要注意的是,在生成reduce value時,除第一個distinct字段所在行須要保留value值,其他distinct數據行value字段都可爲空。
瞭解了MapReduce實現SQL基本操做以後,咱們來看看Hive是如何將SQL轉化爲MapReduce任務的,整個編譯過程分爲六個階段:
下面分別對這六個階段進行介紹
Hive使用Antlr實現SQL的詞法和語法解析。Antlr是一種語言識別的工具,能夠用來構造領域語言。
這裏不詳細介紹Antlr,只須要了解使用Antlr構造特定的語言只須要編寫一個語法文件,定義詞法和語法替換規則便可,Antlr完成了詞法分析、語法分析、語義分析、中間代碼生成的過程。
Hive中語法規則的定義文件在0.10版本之前是Hive.g一個文件,隨着語法規則愈來愈複雜,由語法規則生成的Java解析類可能超過Java類文件的最大上限,0.11版本將Hive.g拆成了5個文件,詞法規則HiveLexer.g和語法規則的4個文件SelectClauseParser.g,FromClauseParser.g,IdentifiersParser.g,HiveParser.g。
通過詞法和語法解析後,若是須要對錶達式作進一步的處理,使用 Antlr 的抽象語法樹語法Abstract Syntax Tree,在語法分析的同時將輸入語句轉換成抽象語法樹,後續在遍歷語法樹時完成進一步的處理。
下面的一段語法是Hive SQL中SelectStatement的語法規則,從中能夠看出,SelectStatement包含select, from, where, groupby, having, orderby等子句。
(在下面的語法規則中,箭頭表示對於原語句的改寫,改寫後會加入一些特殊詞標示特定語法,好比TOK_QUERY標示一個查詢塊)
selectStatement : selectClause fromClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause? distributeByClause? sortByClause? limitClause? -> ^(TOK_QUERY fromClause ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE)) selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause? distributeByClause? sortByClause? limitClause?)) ;
爲了詳細說明SQL翻譯爲MapReduce的過程,這裏以一條簡單的SQL爲例,SQL中包含一個子查詢,最終將數據寫入到一張表中
FROM ( SELECT p.datekey datekey, p.userid userid, c.clienttype FROM detail.usersequence_client c JOIN fact.orderpayment p ON p.orderid = c.orderid JOIN default.user du ON du.userid = p.userid WHERE p.datekey = 20131118 ) base INSERT OVERWRITE TABLE `test`.`customer_kpi` SELECT base.datekey, base.clienttype, count(distinct base.userid) buyer_count GROUP BY base.datekey, base.clienttype
Antlr對Hive SQL解析的代碼以下,HiveLexerX,HiveParser分別是Antlr對語法文件Hive.g編譯後自動生成的詞法解析和語法解析類,在這兩個類中進行復雜的解析。
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command)); //詞法解析,忽略關鍵詞的大小寫 TokenRewriteStream tokens = new TokenRewriteStream(lexer); if (ctx != null) { ctx.setTokenRewriteStream(tokens); } HiveParser parser = new HiveParser(tokens); //語法解析 parser.setTreeAdaptor(adaptor); HiveParser.statement_return r = null; try { r = parser.statement(); //轉化爲AST Tree } catch (RecognitionException e) { e.printStackTrace(); throw new ParseException(parser.errors); }
最終生成的AST Tree以下圖右側(使用Antlr Works生成,Antlr Works是Antlr提供的編寫語法文件的編輯器),圖中只是展開了骨架的幾個節點,沒有徹底展開。
子查詢1/2,分別對應右側第1/2兩個部分。
這裏注意一下內層子查詢也會生成一個TOK_DESTINATION節點。請看上面SelectStatement的語法規則,這個節點是在語法改寫中特地增長了的一個節點。緣由是Hive中全部查詢的數據均會保存在HDFS臨時的文件中,不管是中間的子查詢仍是查詢最終的結果,Insert語句最終會將數據寫入表所在的HDFS目錄下。
詳細來看,將內存子查詢的from子句展開後,獲得以下AST Tree,每一個表生成一個TOK_TABREF節點,Join條件生成一個「=」節點。其餘SQL部分相似,不一一詳述。
AST Tree仍然很是複雜,不夠結構化,不方便直接翻譯爲MapReduce程序,AST Tree轉化爲QueryBlock就是將SQL進一部抽象和結構化。
QueryBlock是一條SQL最基本的組成單元,包括三個部分:輸入源,計算過程,輸出。簡單來說一個QueryBlock就是一個子查詢。
下圖爲Hive中QueryBlock相關對象的類圖,解釋圖中幾個重要的屬性
AST Tree生成QueryBlock的過程是一個遞歸的過程,先序遍歷AST Tree,遇到不一樣的Token節點,保存到相應的屬性中,主要包含如下幾個過程
aliasToTabs
等屬性中destToSelExpr
、destToAggregationExprs
、destToDistinctFuncExprs
三個屬性中最終樣例SQL生成兩個QB對象,QB對象的關係以下,QB1是外層查詢,QB2是子查詢
QB1 \ QB2
Hive最終生成的MapReduce任務,Map階段和Reduce階段均由OperatorTree組成。邏輯操做符,就是在Map階段或者Reduce階段完成單一特定的操做。
基本的操做符包括TableScanOperator,SelectOperator,FilterOperator,JoinOperator,GroupByOperator,ReduceSinkOperator
從名字就能猜出各個操做符完成的功能,TableScanOperator從MapReduce框架的Map接口原始輸入表的數據,控制掃描表的數據行數,標記是從原表中取數據。JoinOperator完成Join操做。FilterOperator完成過濾操做
ReduceSinkOperator將Map端的字段組合序列化爲Reduce Key/value, Partition Key,只可能出如今Map階段,同時也標誌着Hive生成的MapReduce程序中Map階段的結束。
Operator在Map Reduce階段之間的數據傳遞都是一個流式的過程。每個Operator對一行數據完成操做後以後將數據傳遞給childOperator計算。
Operator類的主要屬性和方法以下
QueryBlock生成Operator Tree就是遍歷上一個過程當中生成的QB和QBParseInfo對象的保存語法的屬性,包含以下幾個步驟:
因爲Join/GroupBy/OrderBy均須要在Reduce階段完成,因此在生成相應操做的Operator以前都會先生成一個ReduceSinkOperator,將字段組合並序列化爲Reduce Key/value, Partition Key
接下來詳細分析樣例SQL生成OperatorTree的過程
先序遍歷上一個階段生成的QB對象
首先根據子QueryBlock QB2#aliasToTabs {du=dim.user, c=detail.usersequence_client, p=fact.orderpayment}
生成TableScanOperator
TableScanOperator(「dim.user」) TS[0] TableScanOperator(「detail.usersequence_client」) TS[1] TableScanOperator(「fact.orderpayment」) TS[2]
先序遍歷QBParseInfo#joinExpr
生成QBJoinTree
,類QBJoinTree
也是一個樹狀結構,QBJoinTree
保存左右表的ASTNode和這個查詢的別名,最終生成的查詢樹以下
base / \ p du / \ c p
QBJoinTree
,先生成detail.usersequence_client
和fact.orderpayment
的Join操做樹圖中 TS=TableScanOperator RS=ReduceSinkOperator JOIN=JoinOperator
QBParseInfo#destToWhereExpr
生成FilterOperator
。此時QB2遍歷完成。下圖中SelectOperator在某些場景下會根據一些條件判斷是否須要解析字段。
圖中 FIL= FilterOperator SEL= SelectOperator
圖中 GBY= GroupByOperator
GBY[12]是HASH聚合,即在內存中經過Hash進行聚合運算
圖中FS=FileSinkOperator
大部分邏輯層優化器經過變換OperatorTree,合併操做符,達到減小MapReduce Job,減小shuffle數據量的目的。
名稱 |
做用 |
---|---|
② SimpleFetchOptimizer |
優化沒有GroupBy表達式的聚合查詢 |
② MapJoinProcessor |
MapJoin,須要SQL中提供hint,0.11版本已不用 |
② BucketMapJoinOptimizer |
BucketMapJoin |
② GroupByOptimizer |
Map端聚合 |
① ReduceSinkDeDuplication |
合併線性的OperatorTree中partition/sort key相同的reduce |
① PredicatePushDown |
謂詞前置 |
① CorrelationOptimizer |
利用查詢中的相關性,合併有相關性的Job,HIVE-2206 |
ColumnPruner |
字段剪枝 |
表格中①的優化器均是一個Job幹盡量多的事情/合併。②的都是減小shuffle數據量,甚至不作Reduce。
CorrelationOptimizer優化器很是複雜,都能利用查詢中的相關性,合併有相關性的Job,參考 Hive Correlation Optimizer
對於樣例SQL,有兩個優化器對其進行優化。下面分別介紹這兩個優化器的做用,並補充一個優化器ReduceSinkDeDuplication的做用
斷言判斷提早優化器將OperatorTree中的FilterOperator提早到TableScanOperator以後
NonBlockingOpDeDupProc
優化器合併SEL-SEL 或者 FIL-FIL 爲一個Operator
ReduceSinkDeDuplication能夠合併線性相連的兩個RS。實際上CorrelationOptimizer是ReduceSinkDeDuplication的超集,能合併線性和非線性的操做RS,可是Hive先實現的ReduceSinkDeDuplication
譬以下面這條SQL語句
from (select key, value from src group by key, value) s select s.key group by s.key;
通過前面幾個階段以後,會生成以下的OperatorTree,兩個Tree是相連的,這裏沒有畫到一塊兒
這時候遍歷OperatorTree後能發現前先後兩個RS輸出的Key值和PartitionKey以下
|
Key |
PartitionKey |
---|---|---|
childRS |
key |
key |
parentRS |
key,value |
key,value |
ReduceSinkDeDuplication優化器檢測到:1. pRS Key徹底包含cRS Key,且排序順序一致;2. pRS PartitionKey徹底包含cRS PartitionKey。符合優化條件,會對執行計劃進行優化。
ReduceSinkDeDuplication將childRS和parentheRS與childRS之間的Operator刪掉,保留的RS的Key爲key,value字段,PartitionKey爲key字段。合併後的OperatorTree以下:
OperatorTree轉化爲MapReduce Job的過程分爲下面幾個階段
由上一步OperatorTree只生成了一個FileSinkOperator,直接生成一個MoveTask,完成將最終生成的HDFS臨時文件移動到目標表目錄下
MoveTask[Stage-0] Move Operator
將OperatorTree中的全部根節點保存在一個toWalk的數組中,循環取出數組中的元素(省略QB1,未畫出)
取出最後一個元素TS[p]放入棧 opStack{TS[p]}中
發現棧中的元素符合下面規則R1(這裏用python代碼簡單表示)
"".join([t + "%" for t in opStack]) == "TS%"
生成一個MapReduceTask[Stage-1]
對象,MapReduceTask[Stage-1]
對象的MapWork
屬性保存Operator根節點的引用。因爲OperatorTree之間之間的Parent Child關係,這個時候MapReduceTask[Stage-1]
包含了以TS[p]
爲根的全部Operator
繼續遍歷TS[p]的子Operator,將子Operator存入棧opStack中
當第一個RS進棧後,即棧opStack = {TS[p], FIL[18], RS[4]}時,就會知足下面的規則R2
"".join([t + "%" for t in opStack]) == "TS%.*RS%"
這時候在MapReduceTask[Stage-1]
對象的ReduceWork
屬性保存JOIN[5]
的引用
繼續遍歷JOIN[5]的子Operator,將子Operator存入棧opStack中
當第二個RS放入棧時,即當棧opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6]}
時,就會知足下面的規則R3
"".join([t + "%" for t in opStack]) == 「RS%.*RS%」 //循環遍歷opStack的每個後綴數組
這時候建立一個新的MapReduceTask[Stage-2]
對象,將OperatorTree從JOIN[5]
和RS[6]
之間剪開,併爲JOIN[5]
生成一個子Operator FS[19]
,RS[6]
生成一個TS[20]
,MapReduceTask[Stage-2]
對象的MapWork
屬性保存TS[20]
的引用。
新生成的FS[19]
將中間數據落地,存儲在HDFS臨時文件中。
繼續遍歷RS[6]的子Operator,將子Operator存入棧opStack中
當opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6], JOIN[8], SEL[10], GBY[12], RS[13]}
時,又會知足R3規則
同理生成MapReduceTask[Stage-3]
對象,並切開 Stage-2 和 Stage-3 的OperatorTree
最終將全部子Operator存入棧中以後,opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6], JOIN[8], SEL[10], GBY[12], RS[13], GBY[14], SEL[15], FS[17]}
知足規則R4
"".join([t + "%" for t in opStack]) == 「FS%」
這時候將MoveTask
與MapReduceTask[Stage-3]
鏈接起來,並生成一個StatsTask
,修改表的元信息
此時並無結束,還有兩個根節點沒有遍歷。
將opStack棧清空,將toWalk的第二個元素加入棧。會發現opStack = {TS[du]}
繼續知足R1 TS%,生成MapReduceTask[Stage-5]
繼續從TS[du]
向下遍歷,當opStack={TS[du], RS[7]}
時,知足規則R2 TS%.*RS%
此時將JOIN[8]
保存爲MapReduceTask[Stage-5]
的ReduceWork
時,發如今一個Map對象保存的Operator與MapReduceWork對象關係的Map<Operator, MapReduceWork>
對象中發現,JOIN[8]
已經存在。此時將MapReduceTask[Stage-2]
和MapReduceTask[Stage-5]
合併爲一個MapReduceTask
同理從最後一個根節點TS[c]
開始遍歷,也會對MapReduceTask進行合併
最後一個階段,將MapWork和ReduceWork中的OperatorTree以RS爲界限剪開
最終共生成3個MapReduceTask,以下圖
這裏不詳細介紹每一個優化器的原理,單獨介紹一下MapJoin的優化器
名稱 |
做用 |
---|---|
Vectorizer |
HIVE-4160,將在0.13中發佈 |
SortMergeJoinResolver |
與bucket配合,相似於歸併排序 |
SamplingOptimizer |
並行order by優化器,在0.12中發佈 |
CommonJoinResolver + MapJoinResolver |
MapJoin優化器 |
MapJoin簡單說就是在Map階段將小表讀入內存,順序掃描大表完成Join。
上圖是Hive MapJoin的原理圖,出自Facebook工程師Liyin Tang的一篇介紹Join優化的slice,從圖中能夠看出MapJoin分爲兩個階段:
經過MapReduce Local Task,將小表讀入內存,生成HashTableFiles上傳至Distributed Cache中,這裏會對HashTableFiles進行壓縮。
MapReduce Job在Map階段,每一個Mapper從Distributed Cache讀取HashTableFiles到內存中,順序掃描大表,在Map階段直接進行Join,將數據傳遞給下一個MapReduce任務。
若是Join的兩張表一張表是臨時表,就會生成一個ConditionalTask,在運行期間判斷是否使用MapJoin
CommonJoinResolver優化器就是將CommonJoin轉化爲MapJoin,轉化過程以下
遍歷上一個階段生成的MapReduce任務,發現MapReduceTask[Stage-2]
JOIN[8]
中有一張表爲臨時表,先對Stage-2進行深度拷貝(因爲須要保留原始執行計劃爲Backup Plan,因此這裏將執行計劃拷貝了一份),生成一個MapJoinOperator替代JoinOperator,而後生成一個MapReduceLocalWork讀取小表生成HashTableFiles上傳至DistributedCache中。
MapReduceTask通過變換後的執行計劃以下圖所示
MapJoinResolver優化器遍歷Task Tree,將全部有local work的MapReduceTask拆成兩個Task
最終MapJoinResolver處理完以後,執行計劃以下圖所示
從上述整個SQL編譯的過程,能夠看出編譯過程的設計有幾個優勢值得學習和借鑑
Hive依然在迅速的發展中,爲了提高Hive的性能,hortonworks公司主導的Stinger計劃提出了一系列對Hive的改進,比較重要的改進有:
咱們也將跟進社區的發展,結合自身的業務須要,提高Hive型ETL流程的性能
Antlr: http://www.antlr.org/
Wiki Antlr介紹: http://en.wikipedia.org/wiki/ANTLR
Hive Wiki: https://cwiki.apache.org/confluence/display/Hive/Home
HiveSQL編譯過程: http://www.slideshare.net/recruitcojp/internal-hive
Join Optimization in Hive: Join Strategies in Hive from the 2011 Hadoop Summit (Liyin Tang, Namit Jain)
Hive Design Docs: https://cwiki.apache.org/confluence/display/Hive/DesignDocs