準備數據
- SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
- hive> SELECT * FROM logs;
- a 蘋果 5
- a 橙子 3
- a 蘋果 2
- b 燒雞 1
-
- hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
- a 10
- b 1
計算過程

默認設置了hive.map.aggr=true,因此會在mapper端先group by一次,最後再把結果merge起來,爲了減小reducer處理的數據量。注意看explain的mode是不同的。mapper是hash,reducer是mergepartial。若是把hive.map.aggr=false,那將groupby放到reducer才作,他的mode是complete.java
Operator

Explain
- hive> explain SELECT uid, sum(count) FROM logs group by uid;
- OK
- ABSTRACT SYNTAX TREE:
- (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid))))
-
- STAGE DEPENDENCIES:
- Stage-1 is a root stage
- Stage-0 is a root stage
-
- STAGE PLANS:
- Stage: Stage-1
- Map Reduce
- Alias -> Map Operator Tree:
- logs
- TableScan
- alias: logs
- Select Operator
- expressions:
- expr: uid
- type: string
- expr: count
- type: int
- outputColumnNames: uid, count
- Group By Operator
- aggregations:
- expr: sum(count)
- bucketGroup: false
- keys:
- expr: uid
- type: string
- mode: hash
- outputColumnNames: _col0, _col1
- Reduce Output Operator
- key expressions:
- expr: _col0
- type: string
- sort order: +
- Map-reduce partition columns:
- expr: _col0
- type: string
- tag: -1
- value expressions:
- expr: _col1
- type: bigint
- Reduce Operator Tree:
- Group By Operator
-
- aggregations:
- expr: sum(VALUE._col0)
-
- bucketGroup: false
- keys:
- expr: KEY._col0
- type: string
- mode: mergepartial
- outputColumnNames: _col0, _col1
- Select Operator
- expressions:
- expr: _col0
- type: string
- expr: _col1
- type: bigint
- outputColumnNames: _col0, _col1
- File Output Operator
- compressed: false
- GlobalTableId: 0
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- Stage: Stage-0
- Fetch Operator
- limit: -1
-
- hive> select distinct value from src;
- hive> select max(key) from src;
-
- 由於沒有grouping keys,因此只有一個reducer。
-
-
-
-
-
- 2.2 若是有聚合函數或者groupby,作以下處理:
-
- 插入一個select operator,選取全部的字段,用於優化階段ColumnPruner的優化
-
- 2.2.1 hive.map.aggr爲true,默認是true,開啓的,在map端作部分聚合
-
- 2.2.1.1 hive.groupby.skewindata爲false,默認是關閉的,groupby的數據沒有傾斜。
-
- 生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。
-
- GroupByOperator+ReduceSinkOperator用於在map端作操做,第一個GroupByOperator在map端先作部分聚合。第二個用於在reduce端作GroupBy操做
-
- 2.2.1.2 hive.groupby.skewindata爲true
-
- 生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
-
- GroupbyOperator+ReduceSinkOperator(第一個MapredTask的map階段)
-
- GroupbyOperator(第一個MapredTask的reduce階段)
-
- ReduceSinkOperator (第二個MapredTask的map階段)
-
- GroupByOperator(第二個MapredTask的reduce階段)
-
- 2.2.2 hive.map.aggr爲false
-
- 2.2.2.1 hive.groupby.skewindata爲true
-
- 生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
-
- ReduceSinkOperator(第一個MapredTask的map階段)
-
- GroupbyOperator(第一個MapredTask的reduce階段)
-
- ReduceSinkOperator (第二個MapredTask的map階段)
-
- GroupByOperator(第二個MapredTask的reduce階段)
-
- 2.2.2.2 hive.groupby.skewindata爲false
-
- 生成的operator是: ReduceSinkOperator(map階段運行)+GroupbyOperator(reduce階段運行)
-
-
-
-
-
-
-
- 第一種狀況:
-
- set hive.map.aggr=false;
-
- set hive.groupby.skewindata=false;
-
- SemanticAnalyzer.genGroupByPlan1MR(){
-
- (1)ReduceSinkOperator: It will put all Group By keys and the
- distinct field (if any) in the map-reduce sort key, and all other fields
- in the map-reduce value.
-
- (2)GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)
-
- }
-
-
-
- 第二種狀況:
-
- set hive.map.aggr=true;
-
- set hive.groupby.skewindata=false;
-
- SemanticAnalyzer.genGroupByPlanMapAggr1MR(){
-
- (1)GroupByOperator:GroupByDesc.Mode.HASH,The agggregation
- evaluation functions are as follows: Mapper: iterate/terminatePartial
- (mode = HASH)
-
- (2)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key:
- grouping key if no DISTINCT grouping + distinct key if DISTINCT
-
- (3)GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer:
- iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode =
- MERGEPARTIAL)
-
- }
-
-
-
- 第三種狀況:
-
- set hive.map.aggr=false;
-
- set hive.groupby.skewindata=true;
-
- SemanticAnalyzer.genGroupByPlan2MR(){
-
- (1)ReduceSinkOperator:Partitioning Key: random() if no DISTINCT
- grouping + distinct key if DISTINCT。Sorting Key: grouping key if no
- DISTINCT grouping + distinct key if DISTINCT
-
- (2)GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)
-
- (3)ReduceSinkOperator:Partitioning Key: grouping key。Sorting
- Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
-
- (4)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
-
- }
-
-
-
- 第四種狀況:
-
- set hive.map.aggr=true;
-
- set hive.groupby.skewindata=true;
-
- SemanticAnalyzer.genGroupByPlanMapAggr2MR(){
-
- (1)GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)
-
- (2)ReduceSinkOperator: Partitioning Key: random() if no
- DISTINCT grouping + distinct key if DISTINCT。 Sorting Key: grouping key
- if no DISTINCT grouping + distinct key if DISTINCT。
-
- (3)GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer:
- iterate/terminatePartial if DISTINCT merge/terminatePartial if NO
- DISTINCT (mode = MERGEPARTIAL)
-
- (4)ReduceSinkOperator:Partitioining Key: grouping key。Sorting
- Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
-
- (5)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
-
- }
-
-
-
-
-
- ReduceSinkOperator的processOp(Object row, int
- tag)會根據相應的條件設置Key的hash值,如第四種狀況的第一個ReduceSinkOperator:Partitioning Key:
- random() if no DISTINCT grouping + distinct key if
- DISTINCT,若是沒有DISTINCT字段,那麼在OutputCollector.collect前會設置當前Key的hash值爲一個隨機
- 數,random = new Random(12345);。若是有DISTINCT字段,那麼key的hash值跟grouping +
- distinct key有關。
-
-
-
-
-
-
-
- GroupByOperator:
-
- initializeOp(Configuration hconf)
-
- processOp(Object row, int tag)
-
- closeOp(boolean abort)
-
- forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
-
-
-
-
-
- groupby10.q groupby11.q
-
- set hive.map.aggr=false;
-
- set hive.groupby.skewindata=false;
-
-
-
- EXPLAIN
-
- FROM INPUT
-
- INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,
- count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5))
- GROUP BY INPUT.key;
-
-
-
- STAGE DEPENDENCIES:
-
- Stage-1 is a root stage
-
- Stage-0 depends on stages: Stage-1
-
-
-
- STAGE PLANS:
-
- Stage: Stage-1
-
- Map Reduce
-
- Alias -> Map Operator Tree:
-
- input
-
- TableScan
-
- alias: input
-
- Select Operator
-
- expressions:
-
- expr: key
-
- type: int
-
- expr: value
-
- type: string
-
- outputColumnNames: key, value
-
- Reduce Output Operator
-
- key expressions:
-
- expr: key
-
- type: int
-
- expr: substr(value, 5)
-
- type: string
-
- sort order: ++
-
- Map-reduce partition columns:
-
- expr: key
-
- type: int
-
- tag: -1
-
- Reduce Operator Tree:
-
- Group By Operator
-
- aggregations:
-
- expr: count(KEY._col1:0._col0)
-
- expr: count(DISTINCT KEY._col1:0._col0)
-
- bucketGroup: false
-
- keys:
-
- expr: KEY._col0
-
- type: int
-
- mode: complete
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: _col1
-
- type: bigint
-
- expr: _col2
-
- type: bigint
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: UDFToInteger(_col1)
-
- type: int
-
- expr: UDFToInteger(_col2)
-
- type: int
-
- outputColumnNames: _col0, _col1, _col2
-
- File Output Operator
-
- compressed: false
-
- GlobalTableId: 1
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
- Stage: Stage-0
-
- Move Operator
-
- tables:
-
- replace: true
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
-
-
-
-
-
-
- set hive.map.aggr=true;
-
- set hive.groupby.skewindata=false;
-
-
-
- STAGE DEPENDENCIES:
-
- Stage-1 is a root stage
-
- Stage-0 depends on stages: Stage-1
-
-
-
- STAGE PLANS:
-
- Stage: Stage-1
-
- Map Reduce
-
- Alias -> Map Operator Tree:
-
- input
-
- TableScan
-
- alias: input
-
- Select Operator
-
- expressions:
-
- expr: key
-
- type: int
-
- expr: value
-
- type: string
-
- outputColumnNames: key, value
-
- Group By Operator
-
- aggregations:
-
- expr: count(substr(value, 5))
-
- expr: count(DISTINCT substr(value, 5))
-
- bucketGroup: false
-
- keys:
-
- expr: key
-
- type: int
-
- expr: substr(value, 5)
-
- type: string
-
- mode: hash
-
- outputColumnNames: _col0, _col1, _col2, _col3
-
- Reduce Output Operator
-
- key expressions:
-
- expr: _col0
-
- type: int
-
- expr: _col1
-
- type: string
-
- sort order: ++
-
- Map-reduce partition columns:
-
- expr: _col0
-
- type: int
-
- tag: -1
-
- value expressions:
-
- expr: _col2
-
- type: bigint
-
- expr: _col3
-
- type: bigint
-
- Reduce Operator Tree:
-
- Group By Operator
-
- aggregations:
-
- expr: count(VALUE._col0)
-
- expr: count(DISTINCT KEY._col1:0._col0)
-
- bucketGroup: false
-
- keys:
-
- expr: KEY._col0
-
- type: int
-
- mode: mergepartial
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: _col1
-
- type: bigint
-
- expr: _col2
-
- type: bigint
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: UDFToInteger(_col1)
-
- type: int
-
- expr: UDFToInteger(_col2)
-
- type: int
-
- outputColumnNames: _col0, _col1, _col2
-
- File Output Operator
-
- compressed: false
-
- GlobalTableId: 1
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
- Stage: Stage-0
-
- Move Operator
-
- tables:
-
- replace: true
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- set hive.map.aggr=false;
-
- set hive.groupby.skewindata=true;
-
-
-
- STAGE DEPENDENCIES:
-
- Stage-1 is a root stage
-
- Stage-2 depends on stages: Stage-1
-
- Stage-0 depends on stages: Stage-2
-
-
-
- STAGE PLANS:
-
- Stage: Stage-1
-
- Map Reduce
-
- Alias -> Map Operator Tree:
-
- input
-
- TableScan
-
- alias: input
-
- Select Operator
-
- expressions:
-
- expr: key
-
- type: int
-
- expr: value
-
- type: string
-
- outputColumnNames: key, value
-
- Reduce Output Operator
-
- key expressions:
-
- expr: key
-
- type: int
-
- expr: substr(value, 5)
-
- type: string
-
- sort order: ++
-
- Map-reduce partition columns:
-
- expr: key
-
- type: int
-
- tag: -1
-
- Reduce Operator Tree:
-
- Group By Operator
-
- aggregations:
-
- expr: count(KEY._col1:0._col0)
-
- expr: count(DISTINCT KEY._col1:0._col0)
-
- bucketGroup: false
-
- keys:
-
- expr: KEY._col0
-
- type: int
-
- mode: partial1
-
- outputColumnNames: _col0, _col1, _col2
-
- File Output Operator
-
- compressed: false
-
- GlobalTableId: 0
-
- table:
-
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-
-
-
- Stage: Stage-2
-
- Map Reduce
-
- Alias -> Map Operator Tree:
-
- hdfs:
-
- Reduce Output Operator
-
- key expressions:
-
- expr: _col0
-
- type: int
-
- sort order: +
-
- Map-reduce partition columns:
-
- expr: _col0
-
- type: int
-
- tag: -1
-
- value expressions:
-
- expr: _col1
-
- type: bigint
-
- expr: _col2
-
- type: bigint
-
- Reduce Operator Tree:
-
- Group By Operator
-
- aggregations:
-
- expr: count(VALUE._col0)
-
- expr: count(VALUE._col1)
-
- bucketGroup: false
-
- keys:
-
- expr: KEY._col0
-
- type: int
-
- mode: final
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: _col1
-
- type: bigint
-
- expr: _col2
-
- type: bigint
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: UDFToInteger(_col1)
-
- type: int
-
- expr: UDFToInteger(_col2)
-
- type: int
-
- outputColumnNames: _col0, _col1, _col2
-
- File Output Operator
-
- compressed: false
-
- GlobalTableId: 1
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
- Stage: Stage-0
-
- Move Operator
-
- tables:
-
- replace: true
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
-
-
-
-
-
-
- set hive.map.aggr=true;
-
- set hive.groupby.skewindata=true;
-
-
-
- STAGE DEPENDENCIES:
-
- Stage-1 is a root stage
-
- Stage-2 depends on stages: Stage-1
-
- Stage-0 depends on stages: Stage-2
-
-
-
- STAGE PLANS:
-
- Stage: Stage-1
-
- Map Reduce
-
- Alias -> Map Operator Tree:
-
- input
-
- TableScan
-
- alias: input
-
- Select Operator
-
- expressions:
-
- expr: key
-
- type: int
-
- expr: value
-
- type: string
-
- outputColumnNames: key, value
-
- Group By Operator
-
- aggregations:
-
- expr: count(substr(value, 5))
-
- expr: count(DISTINCT substr(value, 5))
-
- bucketGroup: false
-
- keys:
-
- expr: key
-
- type: int
-
- expr: substr(value, 5)
-
- type: string
-
- mode: hash
-
- outputColumnNames: _col0, _col1, _col2, _col3
-
- Reduce Output Operator
-
- key expressions:
-
- expr: _col0
-
- type: int
-
- expr: _col1
-
- type: string
-
- sort order: ++
-
- Map-reduce partition columns:
-
- expr: _col0
-
- type: int
-
- tag: -1
-
- value expressions:
-
- expr: _col2
-
- type: bigint
-
- expr: _col3
-
- type: bigint
-
- Reduce Operator Tree:
-
- Group By Operator
-
- aggregations:
-
- expr: count(VALUE._col0)
-
- expr: count(DISTINCT KEY._col1:0._col0)
-
- bucketGroup: false
-
- keys:
-
- expr: KEY._col0
-
- type: int
-
- mode: partials
-
- outputColumnNames: _col0, _col1, _col2
-
- File Output Operator
-
- compressed: false
-
- GlobalTableId: 0
-
- table:
-
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-
-
-
- Stage: Stage-2
-
- Map Reduce
-
- Alias -> Map Operator Tree:
-
- hdfs:
-
- Reduce Output Operator
-
- key expressions:
-
- expr: _col0
-
- type: int
-
- sort order: +
-
- Map-reduce partition columns:
-
- expr: _col0
-
- type: int
-
- tag: -1
-
- value expressions:
-
- expr: _col1
-
- type: bigint
-
- expr: _col2
-
- type: bigint
-
- Reduce Operator Tree:
-
- Group By Operator
-
- aggregations:
-
- expr: count(VALUE._col0)
-
- expr: count(VALUE._col1)
-
- bucketGroup: false
-
- keys:
-
- expr: KEY._col0
-
- type: int
-
- mode: final
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: _col1
-
- type: bigint
-
- expr: _col2
-
- type: bigint
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: UDFToInteger(_col1)
-
- type: int
-
- expr: UDFToInteger(_col2)
-
- type: int
-
- outputColumnNames: _col0, _col1, _col2
-
- File Output Operator
-
- compressed: false
-
- GlobalTableId: 1
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
- Stage: Stage-0
-
- Move Operator
-
- tables:
-
- replace: true
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- set hive.map.aggr=false;
-
- set hive.groupby.skewindata=false;
-
-
-
- EXPLAIN extended
-
- FROM INPUT
-
- INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,
- count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5))
- GROUP BY INPUT.key;
-
-
-
- STAGE DEPENDENCIES:
-
- Stage-1 is a root stage
-
- Stage-0 depends on stages: Stage-1
-
-
-
- STAGE PLANS:
-
- Stage: Stage-1
-
- Map Reduce
-
- Alias -> Map Operator Tree:
-
- input
-
- TableScan
-
- alias: input
-
- Select Operator
-
- expressions:
-
- expr: key
-
- type: int
-
- expr: value
-
- type: string
-
- outputColumnNames: key, value
-
- Reduce Output Operator
-
- key expressions:
-
- expr: key
-
- type: int
-
- expr: substr(value, 5)
-
- type: string
-
- sort order: ++
-
- Map-reduce partition columns:
-
- expr: key
-
- type: int
-
- tag: -1
-
- Needs Tagging: false
-
- Path -> Alias:
-
- hdfs:
-
- Path -> Partition:
-
- hdfs:
-
- Partition
-
- base file name: input
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- properties:
-
- bucket_count -1
-
- columns key,value
-
- columns.types int:string
-
- file.inputformat org.apache.hadoop.mapred.TextInputFormat
-
- file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- location hdfs:
-
- name input
-
- serialization.ddl struct input { i32 key, string value}
-
- serialization.format 1
-
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- transient_lastDdlTime 1310523947
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
-
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- properties:
-
- bucket_count -1
-
- columns key,value
-
- columns.types int:string
-
- file.inputformat org.apache.hadoop.mapred.TextInputFormat
-
- file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- location hdfs:
-
- name input
-
- serialization.ddl struct input { i32 key, string value}
-
- serialization.format 1
-
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- transient_lastDdlTime 1310523947
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: input
-
- name: input
-
- Reduce Operator Tree:
-
- Group By Operator
-
- aggregations:
-
- expr: count(KEY._col1:0._col0)
-
- expr: count(DISTINCT KEY._col1:0._col0)
-
- bucketGroup: false
-
- keys:
-
- expr: KEY._col0
-
- type: int
-
- mode: complete
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: _col1
-
- type: bigint
-
- expr: _col2
-
- type: bigint
-
- outputColumnNames: _col0, _col1, _col2
-
- Select Operator
-
- expressions:
-
- expr: _col0
-
- type: int
-
- expr: UDFToInteger(_col1)
-
- type: int
-
- expr: UDFToInteger(_col2)
-
- type: int
-
- outputColumnNames: _col0, _col1, _col2
-
- File Output Operator
-
- compressed: false
-
- GlobalTableId: 1
-
- directory: hdfs:
-
- NumFilesPerFileSink: 1
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- properties:
-
- bucket_count -1
-
- columns key,val1,val2
-
- columns.types int:int:int
-
- file.inputformat org.apache.hadoop.mapred.TextInputFormat
-
- file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- location hdfs:
-
- name dest1
-
- serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
-
- serialization.format 1
-
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- transient_lastDdlTime 1310523946
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
- TotalFiles: 1
-
- MultiFileSpray: false
-
-
-
- Stage: Stage-0
-
- Move Operator
-
- tables:
-
- replace: true
-
- source: hdfs:
-
- table:
-
- input format: org.apache.hadoop.mapred.TextInputFormat
-
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- properties:
-
- bucket_count -1
-
- columns key,val1,val2
-
- columns.types int:int:int
-
- file.inputformat org.apache.hadoop.mapred.TextInputFormat
-
- file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-
- location hdfs:
-
- name dest1
-
- serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
-
- serialization.format 1
-
- serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- transient_lastDdlTime 1310523946
-
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- name: dest1
-
- tmp directory: hdfs:
-
-
-
-
-
-
-
- ABSTRACT SYNTAX TREE:
-
- (TOK_QUERY
-
- (TOK_FROM (TOK_TABREF INPUT))
-
- (TOK_INSERT
-
- (TOK_DESTINATION (TOK_TAB dest1))
-
- (TOK_SELECT
-
- (TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))
-
- (TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
-
- (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
-
- )
-
- (TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))
-
- )
-
- )
-
-
-
-
-
-
-
- SemanticAnalyzer.genBodyPlan(QB qb, Operator input){
-
- if (qbp.getAggregationExprsForClause(dest).size() != 0
-
- || getGroupByForClause(qbp, dest).size() > 0) {
-
-
-
- if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
-
- .equalsIgnoreCase("true") &&
-
- qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
-
- throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
-
- getMsg());
-
- }
-
-
-
-
-
- curr = insertSelectAllPlanForGroupBy(dest, curr);
-
- if (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)
-
- .equalsIgnoreCase("true")) {
-
- if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
-
- .equalsIgnoreCase("false")) {
-
- curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
-
- } else {
-
- curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
-
- }
-
- } else if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
-
- .equalsIgnoreCase("true")) {
-
- curr = genGroupByPlan2MR(dest, qb, curr);
-
- } else {
-
- curr = genGroupByPlan1MR(dest, qb, curr);
-
- }
-
- }
-
- }
-
-
-
- distince:
-
- count.q.out
-
- groupby11.q.out
-
- groupby10.q.out
-
- nullgroup4_multi_distinct.q.out
-
- join18.q.out
-
- groupby_bigdata.q.out
-
- join18_multi_distinct.q.out
-
- nullgroup4.q.out
-
- auto_join18_multi_distinct.q.out
-
- auto_join18.q.out
-
-
-
- (1)map端部分聚合,數據無傾斜,一個MR生成。
-
- genGroupByPlanMapAggr1MR,生成三個Operator:
-
- (1.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
-
- 處理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
-
- 處理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
-
- public GroupByDesc(
-
- final Mode mode,
-
- final java.util.ArrayList<java.lang.String> outputColumnNames,
-
- final java.util.ArrayList<ExprNodeDesc> keys,
-
- final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
-
- final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
-
- this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
-
- false, groupByMemoryUsage, memoryThreshold);
-
- }
-
- mode:GroupByDesc.Mode.HASH
-
- outputColumnNames:groupby+Distinct+Aggregation
-
- keys:groupby+Distinct
-
- aggregators:Aggregation
-
- groupKeyNotReductionKey:false
-
- groupByMemoryUsage:默認爲0.5
-
- memoryThreshold:默認爲0.9
-
-
-
- (1.2)ReduceSinkOperator
-
- 處理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
-
- 處理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,須要作聚合的column加入reduceValues和outputValueColumnNames
-
- public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
-
- int numDistributionKeys,
-
- java.util.ArrayList<ExprNodeDesc> valueCols,
-
- java.util.ArrayList<java.lang.String> outputKeyColumnNames,
-
- List<List<Integer>> distinctColumnIndices,
-
- java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
-
- java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
-
- final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
-
- this.keyCols = keyCols;
-
- this.numDistributionKeys = numDistributionKeys;
-
- this.valueCols = valueCols;
-
- this.outputKeyColumnNames = outputKeyColumnNames;
-
- this.outputValueColumnNames = outputValueColumnNames;
-
- this.tag = tag;
-
- this.numReducers = numReducers;
-
- this.partitionCols = partitionCols;
-
- this.keySerializeInfo = keySerializeInfo;
-
- this.valueSerializeInfo = valueSerializeInfo;
-
- this.distinctColumnIndices = distinctColumnIndices;
-
- }
-
-
-
- (1.3)GroupByOperator
-
- 處理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,須要作聚合的column加入reduceValues和outputValueColumnNames
-
- public GroupByDesc(
-
- final Mode mode,
-
- final java.util.ArrayList<java.lang.String> outputColumnNames,
-
- final java.util.ArrayList<ExprNodeDesc> keys,
-
- final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
-
- final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
-
- this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
-
- false, groupByMemoryUsage, memoryThreshold);
-
- }
-
- mode:GroupByDesc.Mode.MERGEPARTIAL
-
- outputColumnNames:groupby+Aggregation
-
- keys:groupby
-
- aggregators:Aggregation
-
- groupKeyNotReductionKey:false
-
- groupByMemoryUsage:默認爲0.5
-
- memoryThreshold:默認爲0.9
-
-
-
- (2)map端部分聚合,數據傾斜,兩個MR生成。
-
- genGroupByPlanMapAggr2MR:
-
- (2.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
-
- 處理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
-
- 處理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
-
- public GroupByDesc(
-
- final Mode mode,
-
- final java.util.ArrayList<java.lang.String> outputColumnNames,
-
- final java.util.ArrayList<ExprNodeDesc> keys,
-
- final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
-
- final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
-
- this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
-
- false, groupByMemoryUsage, memoryThreshold);
-
- }
-
- mode:GroupByDesc.Mode.HASH
-
- outputColumnNames:groupby+Distinct+Aggregation
-
- keys:groupby+Distinct
-
- aggregators:Aggregation
-
- groupKeyNotReductionKey:false
-
- groupByMemoryUsage:默認爲0.5
-
- memoryThreshold:默認爲0.9
-
-
-
- (2.2)ReduceSinkOperator
-
- 處理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
-
- 處理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,須要作聚合的column加入reduceValues和outputValueColumnNames
-
- public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
-
- int numDistributionKeys,
-
- java.util.ArrayList<ExprNodeDesc> valueCols,
-
- java.util.ArrayList<java.lang.String> outputKeyColumnNames,
-
- List<List<Integer>> distinctColumnIndices,
-
- java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
-
- java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
-
- final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
-
- this.keyCols = keyCols;
-
- this.numDistributionKeys = numDistributionKeys;
-
- this.valueCols = valueCols;
-
- this.outputKeyColumnNames = outputKeyColumnNames;
-
- this.outputValueColumnNames = outputValueColumnNames;
-
- this.tag = tag;
-
- this.numReducers = numReducers;
-
- this.partitionCols = partitionCols;
-
- this.keySerializeInfo = keySerializeInfo;
-
- this.valueSerializeInfo = valueSerializeInfo;
-
- this.distinctColumnIndices = distinctColumnIndices;
-
- }
-
-
-
- (2.3)GroupByOperator
-
- 處理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
-
- public GroupByDesc(
-
- final Mode mode,
-
- final java.util.ArrayList<java.lang.String> outputColumnNames,
-
- final java.util.ArrayList<ExprNodeDesc> keys,
-
- final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
-
- final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
-
- this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
-
- false, groupByMemoryUsage, memoryThreshold);
-
- }
-
- mode:GroupByDesc.Mode.PARTIALS
-
- outputColumnNames:groupby+Aggregation
-
- keys:groupby
-
- aggregators:Aggregation
-
- groupKeyNotReductionKey:false
-
- groupByMemoryUsage:默認爲0.5
-
- memoryThreshold:默認爲0.9
-
-
-
- (2.4)ReduceSinkOperator
-
- 處理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,須要作聚合的column加入reduceValues和outputColumnNames
-
- public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
-
- int numDistributionKeys,
-
- java.util.ArrayList<ExprNodeDesc> valueCols,
-
- java.util.ArrayList<java.lang.String> outputKeyColumnNames,
-
- List<List<Integer>> distinctColumnIndices,
-
- java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
-
- java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
-
- final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
-
- this.keyCols = keyCols;
-
- this.numDistributionKeys = numDistributionKeys;
-
- this.valueCols = valueCols;
-
- this.outputKeyColumnNames = outputKeyColumnNames;
-
- this.outputValueColumnNames = outputValueColumnNames;
-
- this.tag = tag;
-
- this.numReducers = numReducers;
-
- this.partitionCols = partitionCols;
-
- this.keySerializeInfo = keySerializeInfo;
-
- this.valueSerializeInfo = valueSerializeInfo;
-
- this.distinctColumnIndices = distinctColumnIndices;
-
- }
-
-
-
- (2.5)GroupByOperator
-
- 處理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,生成AggregationDesc加入aggregations,須要作聚合的column加入outputColumnNames
-
- public GroupByDesc(
-
- final Mode mode,
-
- final java.util.ArrayList<java.lang.String> outputColumnNames,
-
- final java.util.ArrayList<ExprNodeDesc> keys,
-
- final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
-
- final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
-
- this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
-
- false, groupByMemoryUsage, memoryThreshold);
-
- }
-
- mode:GroupByDesc.Mode.FINAL
-
- outputColumnNames:groupby+Aggregation
-
- keys:groupby
-
- aggregators:Aggregation
-
- groupKeyNotReductionKey:false
-
- groupByMemoryUsage:默認爲0.5
-
- memoryThreshold:默認爲0.9
-
-
-
-
-
- (3)map端不部分聚合,數據傾斜,兩個MR生成。
-
- genGroupByPlan2MR:
-
-
-
- (3.1)ReduceSinkOperator
-
- 處理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
-
- 處理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
-
- 處理聚合函數,getAggregationExprsForClause,須要作聚合的column加入reduceValues和outputValueColumnNames
-
- public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
-
- int numDistributionKeys,
-
- java.util.ArrayList<ExprNodeDesc> valueCols,
-
- java.util.ArrayList<java.lang.String> outputKeyColumnNames,
-
- List<List<Integer>> distinctColumnIndices,
-
- java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
-
- java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
-
- final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
-
- this.keyCols = keyCols;
-
- this.numDistributionKeys = numDistributionKeys;
-
- this.valueCols = valueCols;
-
- this.outputKeyColumnNames = outputKeyColumnNames;
-
- this.outputValueColumnNames = outputValueColumnNames;
-
- this.tag = tag;
-
- this.numReducers = numReducers;
-
- this.partitionCols = partitionCols;
-
- this.keySerializeInfo = keySerializeInfo;