第4節 hive調優:一、二、fetch抓取和表的優化

hive的調優:
第一個調優:fetch抓取,可以避免使用mr的,就儘可能不要用mr,由於mr太慢了
set hive.fetch.task.conversion=more 表示咱們的全局查找,字段查找,limit查找都不走mr
這個屬性配置有三個取值 more minimal none 若是配置成none,全部的都要走mr程序

hive的本地模式:
set hive.exec.mode.local.auto=true 開啓本地模式,解決多個小文件輸入的時候,分配資源時間超過數據的計算時間
set hive.exec.mode.local.auto.inputbytes.max=51234560; 設置輸入的數據臨界值,若是小於這值都認爲是小任務模式,啓動本地模式來執行
set hive.exec.mode.local.auto.input.files.max=10; 設置輸入文件個數的臨界值,若是小於這個數量,那麼也認爲是小任務模式node

注:mapreduce中還有一個小任務模式:linux

hadoop當中的小任務模式 ubermode:
啓動一個maptask,分配資源花了30s,而後maptask去處理一個小文件,花了3s
hadoop當中的小任務模式:
mapreduce.job.ubertask.enable 設置咱們須要開啓hadoop任務的小任務模式 小任務模式能夠根據咱們輸入的數據量作判斷,若是輸入的數據量比較小
輸入10個文件,每一個文件2KB,輸入的數據總量也就是20KB,10個小文件,佔用10個block塊,每一個block塊對應要啓動一個maptask
能夠考慮使用ubermode 小任務模式來實現全部的數據就在一個maptask裏面去處理。sql

set mapreduce.job.ubertask.enable; //查看該參數的值。數據庫

set mapreduce.job.ubertask.enable=true; //設置該參數的值數據結構



第二個優化:hive表的優化
去重的優化:
select count(distinct s_id) from score;這種寫法全部的去重數據都會在一個reduce當中去,形成數據處理比較慢
select count(1) from (select s_id from score group by s_id) bysid; 這種寫法,使用了一個嵌套子查詢,先對數據進行group by去重,而後再進行統計
儘可能避免大sql,能夠將一個很大的sql拆成多段,分步的去執行

大表join大表的優化:
空key的過濾

不過濾:
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
結果:
No rows affected (152.135 seconds)負載均衡

過濾:過濾掉咱們全部的爲null的id,使得咱們的輸入數據量變少
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
結果:
No rows affected (141.585 seconds)函數

空key的轉換
若是規定這些空key過濾不調,那麼咱們能夠對空key進行轉換
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id;
若是空key比較多,那麼就會將大量的空key轉換成 hive,那麼就會遇到一個問題,數據傾斜
數據傾斜的表現形式:有一個reduce處理的數據量遠遠比其餘reduce處理的數據量要大,形成其餘的reduce數據都處理完了,這個還沒處理完

怎麼發現的數據傾斜,如何出現的數據傾斜,怎麼解決的數據傾斜

空key的打散
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;
經過將空key打散成不一樣的隨機字符串,就能夠解決咱們hive的數據傾斜的問題oop

hive第三個優化:map端的join
hive已經開啓了自動的map端的join功能,不論是咱們的大表join小表,仍是小表join大表,都會將咱們的小表加載到內存當中來
首先第一步:啓動一個local的task,尋找哪一個表的數據是小表數據


hive的group by優化:能在map端聚合的數據,就儘可能在map端進行聚合
多加一層mr的程序,讓咱們的數據實現均衡的負載,避免數據的傾斜
測試

count(distinct)的優化:
這種寫法效率低下:SELECT count(DISTINCT id) FROM bigtable;
能夠準換成這種寫法:SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a;
fetch

笛卡爾積:任什麼時候候都要避免笛卡爾積,避免無效的on條件
select from A left join B -- on A.id = B.id

使用分區裁剪,列裁剪:
分區裁剪:若是是咱們的分區表,那麼查詢的時候,儘可能帶上咱們的分區條件
列裁剪:儘可能避免使用select * ,須要查詢哪些列,就選擇哪些列

動態分區調整:
分區表的數據加載兩種方式:
load data inpath '/export/xxx' into table xxx partition(month = 'xxx')
insert overwrite table xxx partition (month = 'xxx') select xxx

使用動態分區動態的添加數據
若是要使用動態分區添加數據,最後一個字段必定要是咱們的分區字段
INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)
SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time
FROM ori_partitioned;

==========================================================

9、調優

9.1 Fetch抓取(Hive能夠避免進行MapReduce)

Hive中對某些狀況的查詢能夠沒必要使用MapReduce計算。例如:SELECT * FROM employees;在這種狀況下,Hive能夠簡單地讀取employee對應的存儲目錄下的文件,而後輸出查詢結果到控制檯。

在hive-default.xml.template文件中hive.fetch.task.conversion默認是more,老版本hive默認是minimal,該屬性修改成more之後,在全局查找、字段查找、limit查找等都不走mapreduce。

<property>

    <name>hive.fetch.task.conversion</name>

    <value>more</value>

    <description>

      Expects one of [none, minimal, more].

      Some select queries can be converted to single FETCH task minimizing latency.

      Currently the query should be single sourced not having any subquery and should not have

      any aggregations or distincts (which incurs RS), lateral views and joins.

      0. none : disable hive.fetch.task.conversion

      1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only 全部的都要走MR

      2. more    : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)

    </description>

  </property>

案例實操:

       1)把hive.fetch.task.conversion設置成none,而後執行查詢語句,都會執行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=none;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;

2)把hive.fetch.task.conversion設置成more,而後執行查詢語句,以下查詢方式都不會執行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=more;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;

9.2 本地模式

大多數的Hadoop Job是須要Hadoop提供的完整的可擴展性來處理大數據集的。不過,有時Hive的輸入數據量是很是小的。在這種狀況下,爲查詢觸發執行任務時消耗可能會比實際job的執行時間要多的多。對於大多數這種狀況,Hive能夠經過本地模式在單臺機器上處理全部的任務。對於小數據集,執行時間能夠明顯被縮短。

用戶能夠經過設置hive.exec.mode.local.auto的值爲true,來讓Hive在適當的時候自動啓動這個優化。

set hive.exec.mode.local.auto=true;  //開啓本地mr

//設置local mr的最大輸入數據量,當輸入數據量小於這個值時採用local  mr的方式,默認爲134217728,即128M

set hive.exec.mode.local.auto.inputbytes.max=51234560;

//設置local mr的最大輸入文件個數,當輸入文件個數小於這個值時採用local mr的方式,默認爲4

set hive.exec.mode.local.auto.input.files.max=10;

案例實操:

1)開啓本地模式,並執行查詢語句

hive (default)> set hive.exec.mode.local.auto=true; 

hive (default)> select * from score cluster by s_id;

18 rows selected (1.568 seconds)

2)關閉本地模式,並執行查詢語句

hive (default)> set hive.exec.mode.local.auto=false; 

hive (default)> select * from score cluster by s_id;

18 rows selected (11.865 seconds)

9.2 表的優化

9.2.1 Join

Join原則:

1)小表Join大表,

將key相對分散,而且數據量小的表放在join的左邊,這樣能夠有效減小內存溢出錯誤發生的概率;再進一步,可使用Group讓小的維度表(1000條如下的記錄條數)先進內存。在map端完成reduce。

select  count(distinct s_id)  from score;

select count(s_id) from score group by s_id; 在map端進行聚合,效率更高

select count(1) from (select s_id from score group by s_id) t;

 

 

2)多個表關聯時,最好分拆成小段,避免大sql(沒法控制中間Job)

3)大表Join大表

(1)空KEY過濾

有時join超時是由於某些key對應的數據太多,而相同key對應的數據都會發送到相同的reducer上,從而致使內存不夠。此時咱們應該仔細分析這些異常的key,不少狀況下,這些key對應的數據是異常數據,咱們須要在SQL語句中進行過濾。例如key對應的字段爲空,操做以下:

環境準備:

create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

load data local inpath '/export/servers/hivedatas/hive_big_table/*' into table ori;

load data local inpath '/export/servers/hivedatas/hive_have_null_id/*' into table nullidtable;

 

不過濾:

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;

結果:

No rows affected (152.135 seconds)

 

過濾:

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL) a JOIN ori b ON a.id = b.id;

結果:

No rows affected (141.585 seconds)

 (2)空key轉換

有時雖然某個key爲空對應的數據不少,可是相應的數據不是異常數據,必需要包含在join的結果中,此時咱們能夠表a中key爲空的字段賦一個隨機的值,使得數據隨機均勻地分佈到不一樣的reducer上。例如:

不隨機分佈:

set hive.exec.reducers.bytes.per.reducer=32123456;

set mapreduce.job.reduces=7;

INSERT OVERWRITE TABLE jointable

SELECT a.*

FROM nullidtable a

LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id;

No rows affected (41.668 seconds)   52.477

結果:這樣的後果就是全部爲null值的id所有都變成了相同的字符串,及其容易形成數據的傾斜(全部的key相同,相同key的數據會到同一個reduce當中去)

 (3)空key打散

爲了解決這種狀況,咱們能夠經過hiverand函數,隨機地給每個爲空的id賦上一個隨機值,這樣就不會形成數據傾斜

隨機分佈:

set hive.exec.reducers.bytes.per.reducer=32123456;

set mapreduce.job.reduces=7;

INSERT OVERWRITE TABLE jointable

SELECT a.*

FROM nullidtable a

LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;

No rows affected (42.594 seconds)

 

 

 

 

4)案例實操

(0)需求:測試大表JOIN小表和小表JOIN大表的效率 (新的版本當中已經沒有區別了,舊的版本當中須要使用小表)

(1)建大表、小表和JOIN後表的語句

create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table jointable2(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

(2)分別向大表和小表中導入數據

hive (default)> load data local inpath '/export/servers/hivedatas/big_data' into table bigtable;

hive (default)>load data local inpath '/export/servers/hivedatas/small_data' into table smalltable;

(3)關閉mapjoin功能(默認是打開的)

set hive.auto.convert.join = false;

(4)執行小表JOIN大表語句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM smalltable s

left JOIN bigtable  b

ON b.id = s.id;

Time taken: 67.411 seconds 

 

(5)執行大表JOIN小表語句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM bigtable  b

left JOIN smalltable  s

ON s.id = b.id;

Time taken: 69.376seconds

能夠看出大表join小表或者小表join大表,就算是關閉map端join的狀況下,在新的版本當中基本上沒有區別了(hive爲了解決數據傾斜的問題,會自動進行過濾)

9.2.2 MapJoin

若是不指定MapJoin或者不符合MapJoin的條件,那麼Hive解析器會將Join操做轉換成Common Join,即:在Reduce階段完成join。容易發生數據傾斜。能夠用MapJoin把小表所有加載到內存在map端進行join,避免reducer處理。

1)開啓MapJoin參數設置:

(1)設置自動選擇Mapjoin

set hive.auto.convert.join = true; 默認爲true

(2)大表小表的閾值設置(默認25M如下認爲是小表):

set hive.mapjoin.smalltable.filesize=25123456;

2)MapJoin工做機制

 

首先是Task A,它是一個Local Task(在客戶端本地執行的Task),負責掃描小表b的數據,將其轉換成一個HashTable的數據結構,並寫入本地的文件中,以後將該文件加載到DistributeCache中。

接下來是Task B,該任務是一個沒有Reduce的MR,啓動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,並直接輸出結果。

因爲MapJoin沒有Reduce,因此由Map直接輸出結果文件,有多少個Map Task,就有多少個結果文件。

案例實操:

(1)開啓Mapjoin功能

set hive.auto.convert.join = true; 默認爲true

(2)執行小表JOIN大表語句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM smalltable s

JOIN bigtable  b

ON s.id = b.id;

Time taken: 31.814 seconds

 

(3)執行大表JOIN小表語句

INSERT OVERWRITE TABLE jointable2

SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url

FROM bigtable  b

JOIN smalltable  s

ON s.id = b.id;

Time taken: 28.46 seconds

9.2.3 Group By

默認狀況下,Map階段同一Key數據分發給一個reduce,當一個key數據過大時就傾斜了。

    並非全部的聚合操做都須要在Reduce端完成,不少聚合操做均可以先在Map端進行部分聚合,最後在Reduce端得出最終結果。

1)開啓Map端聚合參數設置

       (1)是否在Map端進行聚合,默認爲True

set hive.map.aggr = true;

(2)在Map端進行聚合操做的條目數目

    set hive.groupby.mapaggr.checkinterval = 100000;

(3)有數據傾斜的時候進行負載均衡(默認是false)

    set hive.groupby.skewindata = true;

    當選項設定爲 true,生成的查詢計劃會有兩個MR Job。第一個MR Job中,Map的輸出結果會隨機分佈到Reduce中,每一個Reduce作部分聚合操做,並輸出結果,這樣處理的結果是相同的Group By Key有可能被分發到不一樣的Reduce中,從而達到負載均衡的目的;第二個MR Job再根據預處理的數據結果按照Group By Key分佈到Reduce中(這個過程能夠保證相同的Group By Key被分佈到同一個Reduce中),最後完成最終的聚合操做。

9.2.4 Count(distinct)

數據量小的時候無所謂,數據量大的狀況下,因爲COUNT DISTINCT操做須要用一個Reduce Task來完成,這一個Reduce須要處理的數據量太大,就會致使整個Job很難完成,通常COUNT DISTINCT使用先GROUP BY再COUNT的方式替換:

環境準備:

create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

load data local inpath '/home/admin/softwares/data/100萬條大表數據(id除以10取整)/bigtable' into table bigtable;

 

set hive.exec.reducers.bytes.per.reducer=32123456;

SELECT count(DISTINCT id) FROM bigtable;

結果:

c0

10000

Time taken: 35.49 seconds, Fetched: 1 row(s)

能夠轉換成:

set hive.exec.reducers.bytes.per.reducer=32123456;

SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a;

結果:

Stage-Stage-1: Map: 1  Reduce: 4   Cumulative CPU: 13.07 sec   HDFS Read: 120749896 HDFS Write: 464 SUCCESS

Stage-Stage-2: Map: 3  Reduce: 1   Cumulative CPU: 5.14 sec   HDFS Read: 8987 HDFS Write: 7 SUCCESS

_c0

10000

Time taken: 51.202 seconds, Fetched: 1 row(s)

雖然會多用一個Job來完成,但在數據量大的狀況下,這個絕對是值得的。

9.2.5 笛卡爾積

儘可能避免笛卡爾積,即避免join的時候不加on條件,或者無效的on條件,Hive只能使用1個reducer來完成笛卡爾積。

9.2.6 使用分區剪裁、列剪裁

在SELECT中,只拿須要的列,若是有,儘可能使用分區過濾,少用SELECT *。

在分區剪裁中,當使用外關聯時,若是將副表的過濾條件寫在Where後面,那麼就會先全表關聯,以後再過濾,好比:

環境準備:

create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

 

load data local inpath '/home/admin/softwares/data/加遞增id的原始數據/ori' into table ori;

 

load data local inpath '/home/admin/softwares/data/100萬條大表數據(id除以10取整)/bigtable' into table bigtable;

先關聯再Where:

SELECT a.id

FROM bigtable a

LEFT JOIN ori b ON a.id = b.id

WHERE b.id <= 10;

正確的寫法是寫在ON後面:先Where再關聯

SELECT a.id

FROM ori a

LEFT JOIN bigtable b ON (a.id <= 10 AND a.id = b.id);

或者直接寫成子查詢:

SELECT a.id

FROM bigtable a

RIGHT JOIN (SELECT id

FROM ori

WHERE id <= 10

) b ON a.id = b.id;

9.2.7 動態分區調整

關係型數據庫中,對分區表Insert數據時候,數據庫自動會根據分區字段的值,將數據插入到相應的分區中,Hive中也提供了相似的機制,即動態分區(Dynamic Partition),只不過,使用Hive的動態分區,須要進行相應的配置。

 

說白了就是以第一個表的分區規則,來對應第二個表的分區規則,將第一個表的全部分區,所有拷貝到第二個表中來,第二個表在加載數據的時候,不須要指定分區了,直接用第一個表的分區便可

 

 

1)開啓動態分區參數設置

(1)開啓動態分區功能(默認true,開啓)

set hive.exec.dynamic.partition=true;

(2)設置爲非嚴格模式(動態分區的模式,默認strict,表示必須指定至少一個分區爲靜態分區,nonstrict模式表示容許全部的分區字段均可以使用動態分區。)

set hive.exec.dynamic.partition.mode=nonstrict;

(3)在全部執行MR的節點上,最大一共能夠建立多少個動態分區。

set  hive.exec.max.dynamic.partitions=1000;

       (4)在每一個執行MR的節點上,最大能夠建立多少個動態分區。該參數須要根據實際的數據來設定。好比:源數據中包含了一年的數據,即day字段有365個值,那麼該參數就須要設置成大於365,若是使用默認值100,則會報錯。

set hive.exec.max.dynamic.partitions.pernode=100

(5)整個MR Job中,最大能夠建立多少個HDFS文件。

       在linux系統當中,每一個linux用戶最多能夠開啓1024個進程,每個進程最多能夠打開2048個文件,即持有2048個文件句柄,下面這個值越大,就能夠打開文件句柄越大

set hive.exec.max.created.files=100000;

(6)當有空分區生成時,是否拋出異常。通常不須要設置。

set hive.error.on.empty.partition=false;

2)案例實操

需求:將ori中的數據按照時間(如:20111231234568),插入到目標表ori_partitioned的相應分區中。

(1)準備數據原表

create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string)

PARTITIONED BY (p_time bigint)

row format delimited fields terminated by '\t';

 

load data local inpath '/export/servers/hivedatas/small_data' into  table ori_partitioned partition (p_time='20111230000010');

 

load data local inpath '/export/servers/hivedatas/small_data' into  table ori_partitioned partition (p_time='20111230000011');

(2)建立分區表

create table ori_partitioned_target(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';

(3)分析

若是按照以前介紹的往指定一個分區中Insert數據,那麼這個需求很不容易實現。這時候就須要使用動態分區來實現。

set hive.exec.dynamic.partition = true;

set hive.exec.dynamic.partition.mode = nonstrict;

set hive.exec.max.dynamic.partitions = 1000;

set hive.exec.max.dynamic.partitions.pernode = 100;

set hive.exec.max.created.files = 100000;

set hive.error.on.empty.partition = false;

 

INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)

SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time

FROM ori_partitioned;

注意:在PARTITION (month,day)中指定分區字段名便可;

在SELECT子句的最後幾個字段,必須對應前面PARTITION (month,day)中指定的分區字段,包括順序。

查看分區

hive> show partitions ori_partitioned_target;

OK

p_time=20111230000010

p_time=20111230000011

9.2.8 分桶

參見分桶表

相關文章
相關標籤/搜索