Hive調優

第 1 章 Explain 查看執行計劃(重點)

1.1 建立測試用表

  1)建大表、小表和 JOIN 後表的語句
// 建立大表
create table bigtable(id bigint, t bigint, uid string, keyword string, 
url_rank int, click_num int, click_url string) row format delimited 
fields terminated by '\t';
// 建立小表
create table smalltable(id bigint, t bigint, uid string, keyword string,
url_rank int, click_num int, click_url string) row format delimited
fields terminated by '\t';
// 建立 JOIN 後表
create table jointable(id bigint, t bigint, uid string, keyword string,
url_rank int, click_num int, click_url string) row format delimited
fields terminated by '\t';
  2)分別向大表和小表中導入數據
load data local inpath '/opt/module/data/bigtable' into table bigtable;
load data local inpath '/opt/module/data/smalltable' into table smalltable;

  3)數據下載地址:node

1.2 基本語法

EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query-sql

1.3 案例實操

  1)查看下面這條語句的執行計劃
explain select * from bigtable;

explain select click_url, count(*) ct from bigtable group by click_url;

  2)查看詳細執行計劃
explain extended select * from bigtable;
explain extended select click_url, count(*) ct from bigtable group by click_url;

第 2 章 Hive 建表優化

2.1 分區表

  分區表實際上就是對應一個 HDFS 文件系統上的獨立的文件夾,該文件夾下是該分區全部的數據文件。Hive 中的分區就是分目錄,把一個大的數據集根據業務須要分割成小的數據集。在查詢時經過 WHERE 子句中的表達式選擇查詢所須要的指定的分區,這樣的查詢效率會提升不少,因此咱們須要把經常用在 WHERE 語句中的字段指定爲表的分區字段。

2.1.1 分區表基本操做

  1)引入分區表(須要根據日期對日誌進行管理, 經過部門信息模擬)
dept_20200401.log
dept_20200402.log
dept_20200403.log
  2)建立分區表語法(分區字段不能是表中已經存在的數據,能夠將分區字段看做表的僞列)
create table dept_partition(
    deptno int, 
    dname string, 
    loc string
)
partitioned by (day string)
row format delimited fields terminated by '\t';
  3)加載數據到分區表中
    (1)數據準備:dept_20200401.log、dept_20200402.log、dept_20200403.log
10      ACCOUNTING      1700
20      RESEARCH        1800
30      SALES   1900
40      OPERATIONS      1700
50      TEST    2000
60      DEV     1900
    (2)加載數據(分區表加載數據時,必須指定分區)
load data local inpath '/opt/module/data/dept_20200401.log' into table dept_partition partition(day='20200401');
load data local inpath '/opt/module/data/dept_20200402.log' into table dept_partition partition(day='20200402');
load data local inpath '/opt/module/data/dept_20200403.log' into table dept_partition partition(day='20200403');

  4)查詢分區表中數據
    (1)單分區查詢
select * from dept_partition where day='20200401';

    (2)多分區聯合查詢
select * from dept_partition where day='20200401'
union
select * from dept_partition where day='20200402'
union
select * from dept_partition where day='20200403';
select * from dept_partition where day='20200401' or day='20200402' or day='20200403';

  5)增長分區
    (1)增長單個分區
alter table dept_partition add partition(day='20200404');

    (2)同時增長多個分區算法

alter table dept_partition add partition(day='20200405') partition(day='20200406');

  6)刪除分區
    (1)刪除單個分區
alter table dept_partition drop partition(day='20200406');

    (2)同時刪除多個分區sql

alter table dept_partition drop partition(day='20200404'),partition(day='20200405');
  7)查看分區表有多少分區
show partitions dept_partition;

  8)查看分區表結構
desc formatted dept_partition;

  9)思考: 若是一天的日誌數據量也很大,如何再將數據拆分?

2.1.2 二級分區

  1)建立二級分區表
create table dept_partition2(
deptno int,
dname string,
loc string)
partitioned by (day string, hour string)
row format delimited fields terminated by '\t';
  2)正常的加載數據
    (1)加載數據到二級分區表中
load data local inpath '/opt/module/data/dept_20200401.log' into table dept_partition2 partition(day='20200401', hour='12');
    (2)查詢分區數據
select * from dept_partition2 where day='20200401' and hour='12';

2.1.3 動態分區

  關係型數據庫中,對分區表 Insert 數據時候,數據庫自動會根據分區字段的值,將數據插入到相應的分區中,Hive 中也提供了相似的機制,即動態分區(Dynamic Partition),只不過,使用 Hive 的動態分區,須要進行相應的配置。
  1)開啓動態分區參數設置
    (1)開啓動態分區功能(默認 true,開啓)
set hive.exec.dynamic.partition=true;
    (2)設置爲非嚴格模式(動態分區的模式,默認 strict,表示必須指定至少一個分區爲靜態分區,nonstrict 模式表示容許全部的分區字段均可以使用動態分區。)
set hive.exec.dynamic.partition.mode=nonstrict;
    (3)在全部執行 MR 的節點上,最大一共能夠建立多少個動態分區。默認 1000
set hive.exec.max.dynamic.partitions=1000;
    (4)在每一個執行 MR 的節點上,最大能夠建立多少個動態分區。該參數須要根據實際的數據來設定。好比:源數據中包含了一年的數據,即 day 字段有365 個值,那麼該參數就須要設置成大於 365,若是使用默認值 100,則會報錯。
set hive.exec.max.dynamic.partitions.pernode=400;
    (5)整個 MR Job 中,最大能夠建立多少個 HDFS 文件。默認 100000
set hive.exec.max.created.files=100000;
    (6)當有空分區生成時,是否拋出異常。通常不須要設置。默認 false
set hive.error.on.empty.partition=false;
  2)案例實操,需求:將 dept_partition 表中的數據按照地區(loc 字段),插入到目標表 dept_partition 的相應分區中。
    (1)建立目標分區表
create table dept_partition_dy(id int, name string) partitioned by (loc int) row format delimited fields terminated by '\t';
    (2)設置動態分區並插入數據
set hive.exec.dynamic.partition.mode = nonstrict;
insert into table dept_partition_dy partition(loc) select deptno, dname, loc from dept_partition;
    (3)查看目標分區表的分區狀況
show partitions dept_partition_dy;

2.2 分桶表

  分區提供一個隔離數據和優化查詢的便利方式。不過,並不是全部的數據集均可造成合理的分區。對於一張表或者分區,Hive 能夠進一步組織成桶,也就是更爲細粒度的數據範圍劃分。分桶是將數據集分解成更容易管理的若干部分的另外一個技術。分區針對的是數據的存儲路徑,分桶針對的是數據文件。

2.2.1 建立分桶表

  1)數據準備
vim /opt/module/data/student.txt
1001 ss1
1002 ss2
1003 ss3
1004 ss4
1005 ss5
1006 ss6
1007 ss7
1008 ss8
1009 ss9
1010 ss10
1011 ss11
1012 ss12
1013 ss13
1014 ss14
1015 ss15
1016 ss16
  2)建立分桶表
create table stu_buck(id int, name string)
clustered by(id)
into 4 buckets
row format delimited fields terminated by ' ';
  3)查看錶結構
desc formatted stu_buck;

  4)導入數據到分桶表中,load 的方式
load data local inpath '/opt/module/data/student.txt' into table stu_buck;
  5)查看建立的分桶表中是否分紅 4 個桶
  6)查詢分桶的數據
select * from stu_buck;

  7)分桶規則:根據結果可知,Hive 的分桶採用對分桶字段的值進行哈希,而後除以桶的個數求餘的方式決定該條記錄存放在哪一個桶當中

2.2.2 分桶表操做須要注意的事項

  1)reduce 的個數設置爲-1,讓 Job 自行決定須要用多少個 reduce 或者將 reduce 的個數設置爲大於等於分桶表的桶數
  2)從 hdfs 中 load 數據到分桶表中,避免本地文件找不到問題
  3)不要使用本地模式

2.2.3 insert 方式將數據導入分桶表

insert into table stu_buck select * from student_insert;

2.2.3 抽樣查詢

  對於很是大的數據集,有時用戶須要使用的是一個具備表明性的查詢結果而不是所有結果。Hive 能夠經過對錶進行抽樣來知足這個需求。語法:
TABLESAMPLE(BUCKET x OUT OF y)
  查詢表 stu_buck 中的數據(x 的值必須小於等於 y 的值)
select * from stu_buck tablesample(bucket 1 out of 4 on id);
hive (test)> select * from stu_buck tablesample(bucket 5 out of 4 on id);
FAILED: SemanticException [Error 10061]: Numerator should not be bigger than denominator in sample clause for table stu_buck

2.3 合適的文件格式

  Hive 支持的存儲數據的格式主要有:TEXTFILE 、SEQUENCEFILE、ORC、PARQUET。

2.3.1 列式存儲和行式存儲

  如圖所示左邊爲邏輯表,右邊第一個爲行式存儲,第二個爲列式存儲
  1)行存儲的特色
    查詢知足條件的一整行數據的時候,列存儲則須要去每一個彙集的字段找到對應的每一個列的值,行存儲只須要找到其中一個值,其他的值都在相鄰地方,因此此時行存儲查詢的速度更快。
  2)列存儲的特色
    由於每一個字段的數據彙集存儲,在查詢只須要少數幾個字段的時候,能大大減小讀取的數據量;每一個字段的數據類型必定是相同的,列式存儲能夠針對性的設計更好的設計壓縮算法。
  3)TEXTFILE 和 SEQUENCEFILE 的存儲格式都是基於行存儲的;
  4)ORC 和 PARQUET 是基於列式存儲的。

2.3.2 TextFile 格式

  默認格式,數據不作壓縮,磁盤開銷大,數據解析開銷大。可結合 Gzip、Bzip2 使用,但使用 Gzip 這種方式,hive 不會對數據進行切分,從而沒法對數據進行並行操做。

2.3.3 Orc 格式

  Orc (Optimized Row Columnar)是 Hive 0.11 版裏引入的新的存儲格式。

2.3.4 Parquet 格式

  Parquet 文件是以二進制方式存儲的,因此是不能夠直接讀取的,文件中包括該文件的數據和元數據,所以 Parquet 格式文件是自解析的。

2.4 合適的壓縮格式

  爲了支持多種壓縮/解壓縮算法,Hadoop 引入了編碼/解碼器,以下表所示:
壓縮格式            對應的編碼/解碼器
DEFLATE         org.apache.hadoop.io.compress.DefaultCodec
gzip            org.apache.hadoop.io.compress.GzipCodec
bzip2           org.apache.hadoop.io.compress.BZip2Codec
LZO             com.hadoop.compression.lzo.LzopCodec
Snappy          org.apache.hadoop.io.compress.SnappyCodec

  壓縮性能的比較:數據庫

第 3 章 HQL 語法優化

3.1 列裁剪與分區裁剪

  列裁剪就是在查詢時只讀取須要的列,分區裁剪就是隻讀取須要的分區。當列不少或者數據量很大時,若是 select * 或者不指定分區,全列掃描和全表掃描效率都很低。Hive 在讀數據的時候,能夠只讀取查詢中所須要用到的列,而忽略其餘的列。這樣作能夠節省讀取開銷:中間表存儲開銷和數據整合開銷。

3.2 Group By

  默認狀況下,Map 階段同一 Key 數據分發給一個 Reduce,當一個 key 數據過大時就傾斜了。並非全部的聚合操做都須要在 Reduce 端完成,不少聚合操做均可以先在 Map 端進行部分聚合,最後在 Reduce 端得出最終結果。
  開啓 Map 端聚合參數設置
    (1)是否在 Map 端進行聚合,默認爲 True
set hive.map.aggr = true;
    (2)在 Map 端進行聚合操做的條目數目
set hive.groupby.mapaggr.checkinterval = 100000;
    (3)有數據傾斜的時候進行負載均衡(默認是 false)
set hive.groupby.skewindata = true;
  當選項設定爲 true,生成的查詢計劃會有兩個 MR Job。
    第一個 MR Job 中,Map 的輸出結果會隨機分佈到 Reduce 中,每一個 Reduce 作部分聚合操做,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不一樣的 Reduce中,從而達到負載均衡的目的;
    第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程能夠保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操做(雖然能解決數據傾斜,可是不能讓運行速度的更快)
create table emp(empno int,empname string,deptno int) partitioned by (day string) row format delimited fields terminated by ' ';
vim /opt/module/data/emp.txt
1 aa 10
2 bb 20
3 cc 30
load data local inpath '/opt/module/data/emp.txt' into table emp;
select deptno from emp group by deptno;

  優化之後(我這裏數據量比較小,看不出來效果,若數據量大了以後優化效果會很明顯,由於它會在Map端進行聚合)
set hive.groupby.skewindata = true;
select deptno from emp group by deptno;

3.3 Vectorization

  vectorization : 矢量計算的技術,在計算相似scan, filter, aggregation的時候,vectorization技術以設置批處理的增量大小爲 1024 行單次來達到比單條記錄單次得到更高的效率。
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true; 

3.4 多重模式

  若是你碰到一堆 SQL,而且這一堆 SQL 的模式還同樣。都是從同一個表進行掃描,作不一樣的邏輯。有可優化的地方:若是有 n 條 SQL,每一個 SQL 執行都會掃描一次這張表。
insert .... select id,name,sex, age from student where age > 17;
insert .... select id,name,sex, age from student where age > 18;
insert .... select id,name,sex, age from student where age > 19;
  -- 隱藏了一個問題:這種類型的 SQL 有多少個,那麼最終。這張表就被全表掃描了多少次
insert int t_ptn partition(city=A). select id,name,sex, age from student
where city= A;
insert int t_ptn partition(city=B). select id,name,sex, age from student
where city= B;
insert int t_ptn partition(city=c). select id,name,sex, age from student
where city= c;
  修改成:
from student
insert int t_ptn partition(city=A) select id,name,sex, age where city= A
insert int t_ptn partition(city=B) select id,name,sex, age where city= B
  若是一個 HQL 底層要執行 10 個 Job,那麼能優化成 8 個通常來講,確定能有所提升,多重插入就是一個很是實用的技能。一次讀取,屢次插入,有些場景是從一張表讀取數據後,要屢次利用。

3.5 in/exists 語句

  在 Hive 的早期版本中,in/exists 語法是不被支持的,可是從 hive-0.8x 之後就開始支持這個語法。可是不推薦使用這個語法。雖然通過測驗,Hive-2.3.6 也支持 in/exists 操做,但仍是推薦使用 Hive 的一個高效替代方案:left semi join
  好比說:-- in / exists 實現
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id =b.id);
  可使用 join 來改寫:
select a.id, a.name from a join b on a.id = b.id;
  應該轉換成:-- left semi join 實現
select a.id, a.name from a left semi join b on a.id = b.id;

3.6 CBO 優化

  join 的時候表的順序的關係:前面的表都會被加載到內存中。後面的表進行磁盤掃描
select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;
  Hive 自 0.14.0 開始,加入了一項 "Cost based Optimizer" 來對 HQL 執行計劃進行優化,這個功能經過 "hive.cbo.enable" 來開啓。在 Hive 1.1.0 以後,這個 feature 是默認開啓的,它能夠 自動優化 HQL 中多個 Join 的順序,並選擇合適的 Join 算法。
  CBO,成本優化器,代價最小的執行計劃就是最好的執行計劃。傳統的數據庫,成本優化器作出最優化的執行計劃是依據統計信息來計算的。Hive 的成本優化器也同樣,Hive 在提供最終執行前,優化每一個查詢的執行邏輯和物理執行計劃。這些優化工做是交給底層來完成的。根據查詢成本執行進一步的優化,從而產生潛在的不一樣決策:如何排序鏈接,執行哪一種類型的鏈接,並行度等等。
  要使用基於成本的優化(也稱爲 CBO),請在查詢開始設置如下參數:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

3.7 謂詞下推

  將 SQL 語句中的 where 謂詞邏輯都儘量提早執行,減小下游處理的數據量。對應邏輯優化器是 PredicatePushDown,配置項爲 hive.optimize.ppd,默認爲 true。
  案例實操:
  1)打開謂詞下推優化屬性
#謂詞下推,默認是 true
set hive.optimize.ppd = true;
   2)查看先關聯兩張表,再用 where 條件過濾的執行計劃
explain select o.id from bigtable b join bigtable o on o.id = b.id where o.id <= 10;
  3)查看子查詢後,再關聯表的執行計劃
explain select b.id from bigtable b join (select id from bigtable where id <= 10) o on b.id = o.id;

3.8 MapJoin

  MapJoin 是將 Join 雙方比較小的表直接分發到各個 Map 進程的內存中,在 Map 進程中進行 Join 操 做,這樣就不用進行 Reduce 步驟,從而提升了速度。若是不指定 MapJoin或者不符合 MapJoin 的條件,那麼 Hive 解析器會將 Join 操做轉換成 Common Join,即:在Reduce 階段完成 Join。容易發生數據傾斜。能夠用 MapJoin 把小表所有加載到內存在 Map端進行 Join,避免 Reducer 處理。
  1)開啓 MapJoin 參數設置
    (1)設置自動選擇 MapJoin
#默認爲 true
set hive.auto.convert.join=true;
    (2)大表小表的閾值設置(默認 25M 如下認爲是小表):
set hive.mapjoin.smalltable.filesize=25000000;
  2)MapJoin 工做機制
    MapJoin 是將 Join 雙方比較小的表直接分發到各個 Map 進程的內存中,在 Map 進程中進行 Join 操做,這樣就不用進行 Reduce 步驟,從而提升了速度。
  3)案例實操:
    (1)開啓 MapJoin 功能
#默認爲 true
set hive.auto.convert.join = true;
    (2)執行小表 JOIN 大表語句(此時小表(左鏈接)做爲主表,全部數據都要寫出去,所以此時會走 reduce,mapjoin失效)
Explain insert overwrite table jointable
select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable b
on s.id = b.id;
Time taken: 52.581 seconds
    (3)執行大表 JOIN 小表語句
Explain insert overwrite table jointable
select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
left join smalltable s
on s.id = b.id;
Time taken: 55.997 seconds

3.9 大表、大表 SMB Join(重點)

  SMB Join :Sort Merge Bucket Join
  1)建立第二張大表
create table bigtable2(
id bigint,
t bigint,
uid string,
keyword string,
url_rank int,
click_num int,
click_url string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/bigtable' into table bigtable2;
  2)測試大表直接 JOIN
insert overwrite table jointable
select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable a
join bigtable2 b
on a.id = b.id;
   測試結果:Time taken: 100.22 seconds
  3)建立分通表 1
create table bigtable_buck1(
id bigint,
t bigint,
uid string,
keyword string,
url_rank int,
click_num int,
click_url string)
clustered by(id)
sorted by(id)
into 6 buckets
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/bigtable' into table bigtable_buck1;
  4)建立分通表 2,分桶數和第一張表的分桶數爲倍數關係
create table bigtable_buck2(
id bigint,
t bigint,
uid string,
keyword string,
url_rank int,
click_num int,
click_url string)
clustered by(id)
sorted by(id)
into 6 buckets
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/bigtable' into table bigtable_buck2;
  5)設置參數
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
  6)測試
insert overwrite table jointable
select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable_buck1 s
join bigtable_buck2 b
on b.id = s.id;

    測試結果:Time taken: 96.226 secondsapache

3.10 笛卡爾積

  Join 的時候不加 on 條件,或者無效的 on 條件,由於找不到 Join key,Hive 只能使用 1個 Reducer 來完成笛卡爾積。當 Hive 設定爲嚴格模式(hive.mapred.mode=strict,nonstrict)時,不容許在 HQL 語句中出現笛卡爾積。

第 4 章 數據傾斜(重點)

  絕大部分任務都很快完成,只有一個或者少數幾個任務執行的很慢甚至最終執行失敗,這樣的現象爲數據傾斜現象。必定要和數據過量致使的現象區分開,數據過量的表現爲全部任務都執行的很慢,這個時候只有提升執行資源才能夠優化 HQL 的執行效率。
  綜合來看,致使數據傾斜的緣由在於按照 Key 分組之後,少許的任務負責絕大部分數據的計算,也就是說產生數據傾斜的 HQL 中必定存在分組操做,那麼從 HQL 的角度,咱們能夠將數據傾斜分爲單表攜帶了 GroupBy 字段的查詢和兩表(或者多表)Join 的查詢。

4.1 單表數據傾斜優化

4.1.1 使用參數

  當任務中存在 GroupBy 操做同時聚合函數爲 count 或者 sum 能夠設置參數來處理數據傾斜問題。
  是否在 Map 端進行聚合,默認爲 True
set hive.map.aggr = true;
  在 Map 端進行聚合操做的條目數目
set hive.groupby.mapaggr.checkinterval = 100000;
  有數據傾斜的時候進行負載均衡(默認是 false)
set hive.groupby.skewindata = true;
  當選項設定爲 true,生成的查詢計劃會有兩個 MR Job。

4.1.2 增長 Reduce 數量(多個 Key 同時致使數據傾斜)

  1)調整 reduce 個數方法一
    (1)每一個 Reduce 處理的數據量默認是 256MB
set hive.exec.reducers.bytes.per.reducer = 256000000;
    (2)每一個任務最大的 reduce 數,默認爲 1009
set hive.exec.reducers.max = 1009;
    (3)計算 reducer 數的公式
N=min(參數 2,總輸入數據量/參數 1)(參數 2 指的是上面的 1009,參數 1 值得是 256M)
  2)調整 reduce 個數方法二,在 hadoop 的 mapred-default.xml 文件中修改,設置每一個 job 的 Reduce 個數
set mapreduce.job.reduces = 15;

4.2 Join 數據傾斜優化

4.2.1 使用參數

  在編寫 Join 查詢語句時,若是肯定是因爲 join 出現的數據傾斜,那麼請作以下設置:
# join 的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體數據量設置
set hive.skewjoin.key=100000;
# 若是是 join 過程出現傾斜應該設置爲 true
set hive.optimize.skewjoin=false;
  若是開啓了,在 Join 過程當中 Hive 會將計數超過閾值 hive.skewjoin.key(默認 100000)的傾斜 key 對應的行臨時寫進文件中,而後再啓動另外一個 job 作 map join 生成結果。經過hive.skewjoin.mapjoin.map.tasks 參數還能夠控制第二個 job 的 mapper 數量,默認 10000。
set hive.skewjoin.mapjoin.map.tasks=10000;

4.2.2 MapJoin

  開啓Map端Join

第 5 章 Hive Job 優化

5.1 Hive Map 優化

5.1.1 複雜文件增長 Map 數

  當 input 的文件都很大,任務邏輯複雜,map 執行很是慢的時候,能夠考慮增長 Map 數,來使得每一個 map 處理的數據量減小,從而提升任務的執行效率。
  增長 map 的方法爲:根據 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,調整 maxSize 最大值。讓 maxSize 最大值低於 blocksize 就能夠增長 map 的個數。
  案例實操:
  1)執行查詢
select count(*) from emp;
  2)設置最大切片值爲 100 個字節
set mapreduce.input.fileinputformat.split.maxsize=100;
select count(*) from emp;

5.1.2 小文件進行合併

  1)在 map 執行前合併小文件,減小 map 數:CombineHiveInputFormat 具備對小文件進行合併的功能(系統默認的格式)。HiveInputFormat 沒有對小文件合併功能。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  2)在 Map-Reduce 的任務結束時合併小文件的設置:
#在 map-only 任務結束時合併小文件,默認 true
set hive.merge.mapfiles = true;
#在 map-reduce 任務結束時合併小文件,默認 false
set hive.merge.mapredfiles = true;
#合併文件的大小,默認 256M
set hive.merge.size.per.task = 268435456;
#當輸出文件的平均大小小於該值時,啓動一個獨立的 map-reduce 任務進行文件 merge
set hive.merge.smallfiles.avgsize = 16777216;

5.1.3 Map 端聚合

#至關於 map 端執行 combiner
set hive.map.aggr=true;

5.1.4 推測執行

#默認是 true
set mapred.map.tasks.speculative.execution = true;

5.2 Hive Reduce 優化

5.2.1 合理設置 Reduce 數

  1)調整 reduce 個數方法一
    (1)每一個 Reduce 處理的數據量默認是 256MB
set hive.exec.reducers.bytes.per.reducer = 256000000;
    (2)每一個任務最大的 reduce 數,默認爲 1009
set hive.exec.reducers.max = 1009;
    (3)計算 reducer 數的公式
N=min(參數 2,總輸入數據量/參數 1)(參數 2 指的是上面的 1009,參數 1 值得是 256M)
  2)調整 reduce 個數方法二,在 hadoop 的 mapred-default.xml 文件中修改,設置每一個 job 的 Reduce 個數
set mapreduce.job.reduces = 15;
  3)reduce 個數並非越多越好
    (1)過多的啓動和初始化 reduce 也會消耗時間和資源;
    (2)另外,有多少個 reduce,就會有多少個輸出文件,若是生成了不少個小文件,那麼若是這些小文件做爲下一個任務的輸入,則也會出現小文件過多的問題;在設置 reduce 個數的時候也須要考慮這兩個原則:處理大數據量利用合適的 reduce 數;使單個 reduce 任務處理數據量大小要合適;

5.2.2 推測執行

mapred.reduce.tasks.speculative.execution (hadoop 裏面的)
hive.mapred.reduce.tasks.speculative.execution(hive 裏面相同的參數,效果和hadoop 裏面的同樣兩個隨便哪一個都行)

5.3 Hive 任務總體優化

5.3.1 Fetch 抓取

  Fetch 抓取是指,Hive 中對某些狀況的查詢能夠沒必要使用 MapReduce 計算。例如:SELECT * FROM emp;在這種狀況下,Hive 能夠簡單地讀取 emp 對應的存儲目錄下的文件,而後輸出查詢結果到控制檯。在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默認是 more,老版本 hive默認是 minimal,該屬性修改成 more 之後,在全局查找、字段查找、limit 查找等都不走mapreduce。
<property>
    <name>hive.fetch.task.conversion</name>
    <value>more</value>
    <description>
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing
latency.
Currently the query should be single sourced not having any subquery
and should not have any aggregations or distincts (which incurs RS),
lateral views and joins.
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and
virtual columns)
    </description>
</property>
  1)案例實操:
    (1)把 hive.fetch.task.conversion 設置成 none,而後執行查詢語句,都會執行 mapreduce程序。
set hive.fetch.task.conversion=none;
select * from emp;
select empname from emp;
select empname from emp limit 3;
    (2)把 hive.fetch.task.conversion 設置成 more,而後執行查詢語句,以下查詢方式都不會執行 mapreduce 程序。
set hive.fetch.task.conversion=more;
select * from emp;
select empname from emp;
select empname from emp limit 3;

5.3.2 本地模式

  大多數的 Hadoop Job 是須要 Hadoop 提供的完整的可擴展性來處理大數據集的。不過,有時 Hive 的輸入數據量是很是小的。在這種狀況下,爲查詢觸發執行任務消耗的時間可能會比實際 job 的執行時間要多的多。對於大多數這種狀況,Hive 能夠經過本地模式在單臺機器上處理全部的任務。對於小數據集,執行時間能夠明顯被縮短。
  用戶能夠經過設置 hive.exec.mode.local.auto 的值爲 true,來讓 Hive 在適當的時候自動啓動這個優化。
set hive.exec.mode.local.auto=true; //開啓本地 mr
//設置 local mr 的最大輸入數據量,當輸入數據量小於這個值時採用 local mr 的方式,默認爲 134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//設置 local mr 的最大輸入文件個數,當輸入文件個數小於這個值時採用 local mr 的方式,默認爲 4
set hive.exec.mode.local.auto.input.files.max=10;
  1)案例實操:
    (1)開啓本地模式,並執行查詢語句
set hive.exec.mode.local.auto=true;
select * from emp cluster by deptno;
Time taken: 1.443 seconds, Fetched: 3 row(s)
    (2)關閉本地模式,並執行查詢語句
set hive.exec.mode.local.auto=false;
select * from emp cluster by deptno;
Time taken: 19.493 seconds, Fetched: 3 row(s)

5.3.3 並行執行

  Hive 會將一個查詢轉化成一個或者多個階段。這樣的階段能夠是 MapReduce 階段、抽樣階段、合併階段、limit 階段。或者 Hive 執行過程當中可能須要的其餘階段。默認狀況下,Hive 一次只會執行一個階段。不過,某個特定的 job 可能包含衆多的階段,而這些階段可能並不是徹底互相依賴的,也就是說有些階段是能夠並行執行的,這樣可能使得整個 job 的執行時間縮短。不過,若是有更多的階段能夠並行執行,那麼 job 可能就越快完成。
  經過設置參數 hive.exec.parallel 值爲 true,就能夠開啓併發執行。不過,在共享集羣中,須要注意下,若是 job 中並行階段增多,那麼集羣利用率就會增長。
set hive.exec.parallel=true; //打開任務並行執行,默認爲 false
set hive.exec.parallel.thread.number=16; //同一個 sql 容許最大並行度,默認爲 8
  固然,得是在系統資源比較空閒的時候纔有優點,不然,沒資源,並行也起不來(建議在數據量大,sql 很長的時候使用,數據量小,sql 比較的小開啓有可能還不如以前快)

5.3.4 嚴格模式

  Hive 能夠經過設置防止一些危險操做:
  1)分區表不使用分區過濾
    將 hive.strict.checks.no.partition.filter 設置爲 true 時,對於分區表,除非 where 語句中含有分區字段過濾條件來限制範圍,不然不容許執行。換句話說,就是用戶不容許掃描全部分區。進行這個限制的緣由是,一般分區表都擁有很是大的數據集,並且數據增長迅速。沒有進行分區限制的查詢可能會消耗使人不可接受的巨大資源來處理這個表。
  2)使用 order by 沒有 limit 過濾
    將 hive.strict.checks.orderby.no.limit 設置爲 true 時,對於使用了 order by 語句的查詢,要求必須使用 limit 語句。由於 order by 爲了執行排序過程會將全部的結果數據分發到同一個Reducer 中進行處理,強制要求用戶增長這個 LIMIT 語句能夠防止 Reducer 額外執行很長一段時間(開啓了 limit 能夠在數據進入到 reduce 以前就減小一部分數據)。
  3)笛卡爾積
    將 hive.strict.checks.cartesian.product 設置爲 true 時,會限制笛卡爾積的查詢。對關係型數據庫很是瞭解的用戶可能指望在 執行 JOIN 查詢的時候不使用 ON 語句而是使用 where 語句,這樣關係數據庫的執行優化器就能夠高效地將 WHERE 語句轉化成那個 ON 語句。不幸的是,Hive 並不會執行這種優化,所以,若是表足夠大,那麼這個查詢就會出現不可控的狀況。

5.3.5 JVM 重用

  小文件過多的時候使用。

第 6 章 Hive On Spark

6.1 Executor 參數

  以單臺服務器 128G 內存,32 線程爲例。

6.1.1 spark.executor.cores

  該參數表示每一個 Executor 可利用的 CPU 核心數。其值不宜設定過大,由於 Hive 的底層以 HDFS 存儲,而 HDFS 有時對高併發寫入處理不太好,容易形成 race condition。根據經驗實踐,設定在 3~6 之間比較合理。假設咱們使用的服務器單節點有 32 個 CPU 核心可供使用。考慮到系統基礎服務和 HDFS等組件的餘量,通常會將 YARN NodeManager 的 yarn.nodemanager.resource.cpu-vcores 參數設爲 28,也就是 YARN 可以利用其中的 28 核,此時將 spark.executor.cores 設爲 4 最合適,最多能夠正好分配給 7 個 Executor 而不形成浪費。
  又假設 yarn.nodemanager.resource.cpu-vcores 爲 26,那麼將 spark.executor.cores 設爲 5 最合適,只會剩餘 1 個核。因爲一個 Executor 須要一個 YARN Container 來運行,因此還需保證spark.executor.cores的值不能大於單個 Container 能申請到的最大核心數,即 yarn.scheduler.maximum-allocation-vcores 的值。

6.1.2 spark.executor.memory/spark.yarn.executor.memoryOverhead

  這兩個參數分別表示每一個 Executor 可利用的堆內內存量和堆外內存量。堆內內存越大,Executor 就能緩存更多的數據,在作諸如 map join 之類的操做時就會更快,但同時也會使得GC 變得更麻煩。spark.yarn.executor.memoryOverhead 的默認值是 executorMemory * 0.10,最小值爲 384M(每一個 Executor)。Hive 官方提供了一個計算 Executor 總內存量的經驗公式,以下:
yarn.nodemanager.resource.memory-mb*(spark.executor.cores/yarn.nodemanager.resource.cpu-vcores)
  其實就是按核心數的比例分配。在計算出來的總內存量中,80%~85%劃分給堆內內存,剩餘的劃分給堆外內存。假設集羣中單節點有 128G 物理內存,yarn.nodemanager.resource.memory-mb(即單個NodeManager 可以利用的主機內存量)設爲 100G,那麼每一個 Executor 大概就是 100*(4/28)=約 14G。再 按 8:2 比 例 劃 分 的 話 , 最 終 spark.executor.memory 設 爲 約 11.2G ,spark.yarn.executor.memoryOverhead 設爲約 2.8G。經過這些配置,每一個主機一次能夠運行多達 7 個 executor。每一個 executor 最多能夠運行4 個 task(每一個核一個)。
  所以,每一個 task 平均有 3.5 GB(14 / 4)內存。在 executor 中運行的全部 task 共享相同的堆空間。
set spark.executor.memory=11.2g;
set spark.yarn.executor.memoryOverhead=2.8g;
  同理,這兩個內存參數相加的總量也不能超過單個 Container 最多能申請到的內存量,即 yarn.scheduler.maximum-allocation-mb 配置的值。

6.1.3 spark.executor.instances

  該參數表示執行查詢時一共啓動多少個 Executor 實例,這取決於每一個節點的資源分配狀況以及集羣的節點數。若咱們一共有 10 臺 32C/128G 的節點,並按照上述配置(即每一個節點承載 7 個 Executor),那麼理論上講咱們能夠將 spark.executor.instances 設爲 70,以使集羣資源最大化利用。可是實際上通常都會適當設小一些(推薦是理論值的一半左右,好比 40),由於 Driver 也要佔用資源,而且一個 YARN 集羣每每還要承載除了 Hive on Spark 以外的其餘業務。

6.1.4 spark.dynamicAllocation.enabled

  上面所說的固定分配 Executor 數量的方式可能不太靈活,尤爲是在 Hive 集羣面向不少用戶提供分析服務的狀況下。因此更推薦將 spark.dynamicAllocation.enabled 參數設爲 true,以啓用 Executor 動態分配。

6.1.5 參數配置樣例參考

set hive.execution.engine=spark;
set spark.executor.memory=11.2g;
set spark.yarn.executor.memoryOverhead=2.8g;
set spark.executor.cores=4;
set spark.executor.instances=40;
set spark.dynamicAllocation.enabled=true;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

6.2 Driver 參數

6.2.1 spark.driver.cores

  該參數表示每一個 Driver 可利用的 CPU 核心數。絕大多數狀況下設爲 1 都夠用。

6.2.2 spark.driver.memory/spark.driver.memoryOverhead

  這兩個參數分別表示每一個 Driver 可利用的堆內內存量和堆外內存量。根據資源富餘程度和做業的大小,通常是將總量控制在 512MB~4GB 之間,而且沿用 Executor 內存的「二八分配方式」。例如,spark.driver.memory 能夠設爲約 819MB,spark.driver.memoryOverhead 設爲約 205MB,加起來正好 1G。
相關文章
相關標籤/搜索