HIVE Group by、join、distinct等實現原理

轉自:html

 

Hive – Distinct 的實現:http://ju.outofmemory.cn/entry/784

Hive – Group By 的實現:http://ju.outofmemory.cn/entry/785

Hive – JOIN實現過程:http://ju.outofmemory.cn/entry/786

hive 結合執行計劃 分析 limit 執行原理:http://yaoyinjie.blog.51cto.com/3189782/923378sql

Hive 的 distribute byexpress

Order by 可以預期產生徹底排序的結果,可是它是經過只用一個reduce來作到這點的。因此對於大規模的數據集它的效率很是低。在不少狀況下,並不須要全局排序,此時能夠換成Hive的非標準擴展sort by。Sort by爲每一個reducer產生一個排序文件。在有些狀況下,你須要控制某個特定行應該到哪一個reducer,一般是爲了進行後續的彙集操做。Hive的distribute by 子句能夠作這件事。apache

 

[sql]  view plain copy print ?
 
  1. // 根據年份和睦溫對氣象數據進行排序,以確保全部具備相同年份的行最終都在一個reducer分區中  
  2.   
  3. From record2  
  4. select year, temperature  
  5. distribute by year  
  6. sort by year asc, temperature desc;  


所以,distribute by 常常和 sort by 配合使用。app

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

準備數據

語句函數

SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;
hive> SELECT * FROM logs; OK a 蘋果 3 a 橙子 3 a 燒雞 1 b 燒雞 3 hive> SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;

根據count分組,計算獨立用戶數。oop

計算過程

hive-distinct-cal

1. 第一步先在mapper計算部分值,會以count和uid做爲key,若是是distinct而且以前已經出現過,則忽略這條計算。第一步是以組合爲key,第二步是以count爲key.
2. ReduceSink是在mapper.close()時才執行的,在GroupByOperator.close()時,把結果輸出。注意這裏雖然key是count和uid,可是在reduce時分區是按count來的!
3. 第一步的distinct計算的值沒用,要留到reduce計算的才準確。這裏只是減小了key組合相同的行。不過若是是普通的count,後面是會合並起來的。
4. distinct經過比較lastInvoke判斷要不要+1(由於在reduce是排序過了的,因此判斷distict的字段變了沒有,若是沒變,則不+1)ui

Operator

hive-distinct-op

Explain

hive> explain select count, count(distinct uid) from logs group by count; OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL count)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL uid)))) (TOK_GROUPBY (TOK_TABLE_OR_COL count)))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: logs TableScan //表掃描 alias: logs Select Operator//列裁剪,取出uid,count字段就夠了 expressions: expr: count type: int expr: uid type: string outputColumnNames: count, uid Group By Operator //先來map彙集 aggregations: expr: count(DISTINCT uid) //彙集表達式 bucketGroup: false keys: expr: count type: int expr: uid type: string mode: hash //hash方式 outputColumnNames: _col0, _col1, _col2 Reduce Output Operator key expressions: //輸出的鍵 expr: _col0 //count type: int expr: _col1 //uid type: string sort order: ++ Map-reduce partition columns: //這裏是按group by的字段分區的 expr: _col0 //這裏表示count type: int tag: -1 value expressions: expr: _col2 type: bigint Reduce Operator Tree: Group By Operator //第二次彙集 aggregations: expr: count(DISTINCT KEY._col1:0._col0) //uid:count bucketGroup: false keys: expr: KEY._col0 //count type: int mode: mergepartial //合併 outputColumnNames: _col0, _col1 Select Operator //列裁剪 expressions: expr: _col0 type: int expr: _col1 type: bigint outputColumnNames: _col0, _col1 File Output Operator //輸出結果到文件 compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

準備數據

SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
hive> SELECT * FROM logs; a 蘋果 5 a 橙子 3 a 蘋果 2 b 燒雞 1 hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid; a 10 b 1

計算過程

hive-groupby-cal
默認設置了hive.map.aggr=true,因此會在mapper端先group by一次,最後再把結果merge起來,爲了減小reducer處理的數據量。注意看explain的mode是不同的。mapper是hash,reducer是mergepartial。若是把hive.map.aggr=false,那將groupby放到reducer才作,他的mode是complete.spa

Operator

hive-groupby-op

Explain

hive> explain SELECT uid, sum(count) FROM logs group by uid; OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid)))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: logs TableScan // 掃描表 alias: logs Select Operator //選擇字段 expressions: expr: uid type: string expr: count type: int outputColumnNames: uid, count Group By Operator //這裏是由於默認設置了hive.map.aggr=true,會在mapper先作一次聚合,減小reduce須要處理的數據 aggregations: expr: sum(count) //彙集函數 bucketGroup: false keys: //鍵 expr: uid type: string mode: hash //hash方式,processHashAggr() outputColumnNames: _col0, _col1 Reduce Output Operator //輸出key,value給reducer key expressions: expr: _col0 type: string sort order: + Map-reduce partition columns: expr: _col0 type: string tag: -1 value expressions: expr: _col1 type: bigint Reduce Operator Tree: Group By Operator aggregations: expr: sum(VALUE._col0) //聚合 bucketGroup: false keys: expr: KEY._col0 type: string mode: mergepartial //合併值 outputColumnNames: _col0, _col1 Select Operator //選擇字段 expressions: expr: _col0 type: string expr: _col1 type: bigint outputColumnNames: _col0, _col1 File Output Operator //輸出到文件 compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1
group  hive  MapReduce

準備數據

SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
hive> SELECT * FROM logs; a 蘋果 5 a 橙子 3 a 蘋果 2 b 燒雞 1 hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid; a 10 b 1

計算過程

hive-groupby-cal
默認設置了hive.map.aggr=true,因此會在mapper端先group by一次,最後再把結果merge起來,爲了減小reducer處理的數據量。注意看explain的mode是不同的。mapper是hash,reducer是mergepartial。若是把hive.map.aggr=false,那將groupby放到reducer才作,他的mode是complete..net

Operator

hive-groupby-op

Explain

hive> explain SELECT uid, sum(count) FROM logs group by uid; OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid)))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: logs TableScan // 掃描表 alias: logs Select Operator //選擇字段 expressions: expr: uid type: string expr: count type: int outputColumnNames: uid, count Group By Operator //這裏是由於默認設置了hive.map.aggr=true,會在mapper先作一次聚合,減小reduce須要處理的數據 aggregations: expr: sum(count) //彙集函數 bucketGroup: false keys: //鍵 expr: uid type: string mode: hash //hash方式,processHashAggr() outputColumnNames: _col0, _col1 Reduce Output Operator //輸出key,value給reducer key expressions: expr: _col0 type: string sort order: + Map-reduce partition columns: expr: _col0 type: string tag: -1 value expressions: expr: _col1 type: bigint Reduce Operator Tree: Group By Operator aggregations: expr: sum(VALUE._col0) //聚合 bucketGroup: false keys: expr: KEY._col0 type: string mode: mergepartial //合併值 outputColumnNames: _col0, _col1 Select Operator //選擇字段 expressions: expr: _col0 type: string expr: _col1 type: bigint outputColumnNames: _col0, _col1 File Output Operator //輸出到文件 compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

準備數據

語句
SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
咱們但願的結果是把users表join進來獲取age字段。

hive> SELECT * FROM logs; OK a 蘋果 5 a 橙子 3 b 燒雞 1 hive> SELECT * FROM users; OK a 23 b 21 hive> SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid); a 蘋果 23 a 橙子 23 b 燒雞 21

計算過程

hive-join-cal

  1. key這裏後面的數字是tag,後面在reduce階段用來區分來自於那個表的數據。tag是附屬在key後面的。那爲何會把a(0)和a(1)聚集在一塊兒了呢,是由於對先對a求了hashcode,設在了HiveKey上,因此同一個key仍是在一塊兒的。
  2. Map階段只是拆分key和value。
  3. reduce階段主要看它是如何把它合併起來了,從圖上能夠直觀的看到,其實就是把tag=1的內容,都加到tag=0的後面,就是這麼簡單。
  4. 代碼實現上,就是先臨時用個變量把值存儲起來在storage裏面, storage(0) = [{a, 蘋果}, {a, 橙子}] storage(1) = [{23}],當key變化(如a變爲b)或所有結束時,會調用endGroup()方法,把內容合併起來。變成[{a,蘋果,23}, {a, 橙子,23}]

Operator

hive-join-op

Explain

hive> explain SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid); OK //語法樹 ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME logs) a) (TOK_TABREF (TOK_TABNAME users) b) (= (. (TOK_TABLE_OR_COL a) uid) (. (TOK_TABLE_OR_COL b) uid)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) uid)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) name)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) age))))) //階段 STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: //mapper階段 a TableScan //掃描表, 就只是一行一行的傳遞下去而已 alias: a Reduce Output Operator //輸出給reduce的內容 key expressions: // key啦,這裏的key是uid,就是咱們寫在ON子句那個,你能夠試試加多幾個條件 expr: uid type: string sort order: + //排序 Map-reduce partition columns://分區字段,貌似是和key同樣的 expr: uid type: string tag: 0 //用來區分這個key是來自哪一個表的 value expressions: //reduce用到的value字段 expr: uid type: string expr: name type: string b TableScan //掃描表, 就只是一行一行的傳遞下去而已 alias: b Reduce Output Operator //輸出給reduce的內容 key expressions: //key expr: uid type: string sort order: + Map-reduce partition columns: //分區字段 expr: uid type: string tag: 1 //用來區分這個key是來自哪一個表的 value expressions: //值 expr: age type: int Reduce Operator Tree: // reduce階段 Join Operator // JOIN的Operator condition map: Inner Join 0 to 1 // 內鏈接0和1表 condition expressions: // 第0個表有兩個字段,分別是uid和name, 第1個表有一個字段age {VALUE._col0} {VALUE._col1} {VALUE._col1} handleSkewJoin: false //是否處理傾斜join,若是是,會分爲兩個MR任務 outputColumnNames: _col0, _col1, _col6 //輸出字段 Select Operator //列裁剪(咱們sql寫的select字段) expressions: expr: _col0 type: string expr: _col1 type: string expr: _col6 type: int outputColumnNames: _col0, _col1, _col2 File Output Operator //把結果輸出到文件 compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1

能夠看到裏面都是一個個Operator順序的執行下來

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

相關文章
相關標籤/搜索