Hive調優

Hive調優原則

 

Hive是將符合SQL語法的字符串解析生成能夠在Hadoop上執行的MapReduce的工具。理解Hadoop的核心能力,是Hive 優化的根本。使用Hive儘可能按照分佈式計算的一些特色來設計SQL,Hive的調優原則主要包括如下幾點:node

 

原子化操做

儘可能原子化操做,避免一個SQL包含複雜邏輯,可使用中間表來完成複雜的邏輯,從而提升總體執行效率。通常來講,單個SQL所起的JOB個數儘可能控制在5個如下。若是Union All的部分個數大於2,或者每一個Union部分數據量大,應該拆分紅多個Insert Into語句apache

 

充分利用服務器資源

讓服務器儘量的多作事情,充分利用服務器的計算資源,以實現最高的系統吞吐量爲目標。好比一個做業能作完的事情就不要拆開兩個做業來完成。Reduce個數過少不能真正發揮Hadoop並行計算的優點,但Reduce個數過多,會形成大量小文件問題,因此須要根據數據量和資源狀況找到一個折衷點。服務器

Hive能夠將沒有依賴關係的屢次MR過程(例如Union All語義中的多個子查詢)併發提交。使用hive.exec.parallel參數控制在同一個SQL中的不一樣做業是否能夠同時運行,從而提升做業的併發,充分利用服務器的資源:session

SET hive.exec.parallel=true;併發

SET hive.exec.parallel.thread.number=最大併發job數;app

 

選擇最優的執行路徑

讓服務器儘可能少作事情,選擇最優的執行路徑,以資源消耗最少爲目標。好比負載均衡

注意Join的使用jvm

兩表關聯時,若其中有一個表很小,則使用Map Join,不然使用普通的Reduce Join分佈式

SET hive.auto.convert.join=true ;函數

SET hive.smalltable.filesize=25000000L(默認是25M);

 

注意小文件的問題

在Hive裏有兩種比較常見的處理辦法。第一是使用Combinefileinputformat,將多個小文件打包做爲一個總體的Inputsplit,減小Map任務數;

SET mapred.max.split.size=128000000;

SET mapred.min.split.size=128000000;

SET mapred.min.split.size.per.node=128000000;

SET mapred.min.split.size.per.rack=128000000;

SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

 

第二是文件數目小,容易在文件存儲端形成瓶頸,給 HDFS 帶來壓力,影響處理效率。對此,能夠經過合併Map和Reduce的結果文件來消除這樣的影響。

設置Hive參數,將額外啓動一個MR Job打包小文件。

用於設置合併屬性的參數有:

是否合併Map輸出文件:SET hive.merge.mapfiles=true(默認值爲真)

是否合併Reduce 端輸出文件:SET hive.merge.mapredfiles= true(默認值爲假)

合併文件的大小:SET hive.merge.size.per.task=256*1000*1000(默認值爲 256000000)

 

注意執行過程當中的數據傾斜問題

在Hive調優中比較經常使用的處理辦法有兩種:

第一,經過hive.groupby.skewindata = true控制生成兩個MR Job,第一個MR Job Map的輸出結果隨機分配到Reduce作次預彙總,減小某些key值條數過多或太小形成的數據傾斜問題。

第二,經過hive.map.aggr = true(默認爲true)在Map端作Combiner,假如map各條數據基本上不同,聚合沒什麼意義,作Combiner反而多此一舉,Hive會經過如下兩個參數:

hive.groupby.mapaggr.checkinterval = 100000 (默認)

hive.map.aggr.hash.min.reduction = 0.5(默認)

預先取100000條數據聚合,若是聚合後的條數/100000>0.5,則再也不作聚合。

 

合理設置Map與Reduce的個數

增長Map數

同時可執行的Map數是有限的,一般狀況下,做業會經過Input的目錄產生一個或者多個Map任務,而主要的決定因素是Input的文件總個數和Input的文件大小。

若是表A只有一個文件,且大小超過100M,包含上千萬記錄,任務較爲耗時,能夠考慮用多個Map任務完成,有效提高性能

合理設置Reduce數

增長map數能夠經過控制一個做業的Reduce數來加以控制。Reduce個數的設定會極大影響執行效率,通常基於如下參數設定:

hive.exec.reducers.bytes.per.reducer(每一個reduce任務處理的數據量,如1G)

hive.exec.reducers.max(每一個任務最大的reduce數)

若是Reduce的輸入(Map的輸出)總大小不超過1G,那麼只會有一個Reduce任務。能夠根據實際狀況,經過縮小每一個Reduce任務處理的數據量來提升執行性能。

 

Hive SQL調優

Hive查詢生成多個Map Reduce做業,一個Map Reduce做業又有map,reduce,spill,shuffle,sort等多個階段,因此針對Hive SQL的優化能夠大體分爲針對MR中單個步驟的優化,針對MR全局的優化以及針對整個查詢的優化。Hive SQL的調優貫穿全部階段,主要是解決運行過程當中數據傾斜問題,減小做業數,對小文件進行合併處理,合理設置Map Reduce的任務數等,根據數據量和模型狀況,經過迭代調測來有效提高性能。如下是常見的Hive SQL調優方法:

 

Hive join優化

減小沒必要要的關聯

Hive SQL和其餘SQL同樣,是一種功能強大的說明性語言,對於同一個業務功能,能夠經過不一樣的寫法來實現,而不一樣的寫法會產生不一樣的性能特色。

例如一個點擊率表,包括了訪問時間、SessionID、網址和來源IP地址共四個字段:

CREATE TABLE clicks (

timestamp date, sessionID string, url string, source_ip string)

STORED as ORC tblproperties (「orc.compress」 = 「SNAPPY」);

假設咱們須要查找每一個SessionID最後一次的訪問網址:

方法一:

SELECT clicks.* FROM clicks inner join

(select sessionID, max(timestamp) as max_ts from clicks group by sessionID) latest

ON clicks.sessionID = latest.sessionID and clicks.timestamp = latest.max_ts;

該方法使用了子查詢方法去收集每一個SessionID的最後訪問時間,而後經過Inner Join自關聯去排除掉以前的點擊訪問記錄,效率較低。

方法二:

SELECT * FROM (SELECT *, RANK() over (partition by sessionID,order by timestamp desc) as rank FROM clicks) ranked_clicks

WHERE ranked_clicks.rank=1;

這裏使用了OLAP的排位函數去實現相同的業務查詢,關鍵不須要表關聯,僅爲單表操做,刪除沒必要要的關聯在大數據開發上意義重大,能大幅提升性能。

 

帶表分區的Join

Hive是先關聯再進行Where條件判斷,若是在右表b中找不到左表a表的記錄,b表中的因此列都會顯示爲NULL,這種狀況下left outer的查詢結果與where子句無關,解決辦法是將Where條件放在JOIN ON的關聯判斷條件中。

SQL編寫須要儘早過濾數據,以減小各個階段的數據量,只選擇須要用到的字段。

 

Skew Join優化

優化Skewed Join Key爲Map Join。開啓hive.optimize.skewjoin=true可優化處理過程當中傾斜的數據。但須要注意Skew Join優化須要額外的Map Join操做,且不能節省Shuffle的代價。

 

利用隨機數減小數據傾斜

大表之間Join容易由於空值而產生數據傾斜,除了經過過濾方法排除空值,還能夠利用隨機數分散到不一樣的Reduce上,例如:

select a.uid from big_table_a a left outer join big_table_b b on b.uid = case when a.uid is null or length(a.uid)=0 then concat('rd_sid',rand()) else a.uid end;

把空值的Key變成一個字符串加上隨機數,就能把傾斜的數據分到不一樣的Reduce上,解決數據傾斜問題。由於空值不參與關聯,即便分到不一樣的Reduce上,也不影響最終的結果,並且IO減小,做業數也減小了,執行效率更優。

 

Group by

Group By是在Reduce階段的操做,防止數據傾斜:

Map端聚合,提早一部分計算:Hive.map.aggr = true

hive.groupby.skewindata爲ture的時候,生成的查詢計劃會有兩個MRJob:

第一個MRJob 中,Map的輸出結果集合會隨機分佈到Reduce中,每一個Reduce作部分聚合操做,並輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不一樣的Reduce中,從而達到負載均衡的目的。

第二個MRJob再根據預處理的數據結果按照GroupBy Key分佈到Reduce中(這個過程能夠保證相同的GroupBy Key被分佈到同一個Reduce中),最後完成最終的聚合操做。

Count Distinct

數據量小的時候無所謂,數據量大的狀況下,因爲COUNT DISTINCT操做須要用一個Reduce Task來完成,這一個Reduce須要處理的數據量太大,就會致使整個Job很難完成,通常COUNT DISTINCT使用先GROUP BY再COUNT的方式替換:

SELECT day,

COUNT(DISTINCT id)AS uv

FROM test

GROUP BY day

能夠轉換成:

SELECT day,

COUNT(id) AS uv

FROM (SELECTday,id FROM test GROUP BY day,id) a

GROUP BY day;

雖然會多用一個Job來完成,但在數據量大的狀況下,這個絕對是值得的。

Order by VS Sort by

Order by是在全局的排序,只用一個Reduce去運行,因此在SET Hive.mapred.mode=strict 模式下,不容許執行如下查詢:

沒有limit限制的order by語句

分區表上沒有指定分區

笛卡爾積(JOIN時沒有ON語句)。

而Sort by是在每一個Reduce內排序,只保證同一個Reduce下排序正確。

 

通用Hive SQL優化方法

一、儘可能利用分區,好比按時間進行分區。

二、關聯條件不能忽略,避免Select *。

三、關聯字段的類型保持一致,避免字段的強制轉換。

四、避免使用LIKE 進行模糊匹配的查詢。

五、對查詢進行優化,要儘可能避免全表掃描

 

 

經常使用參數

(經常使用的一些可設置參數,具體數值按照須要進行調整!)

SET hive.optimize.skewjoin = true;

SET hive.skewjoin.key = 100000;

SET hive.exec.dynamic.partition.mode = nonstrict;

SET mapred.reducer.tasks = 50;

 

// Hive中間結果壓縮和壓縮輸出

SET hive.exec.compress.output = true; -- 默認false

SET hive.exec.compress.intermediate = true; -- 默認false

SET mapred.output.compression.codec = org.apache.hadoop.io.compress.SnappyCodec; -- 默認org.apache.hadoop.io.compress.DefaultCodec

SET mapred.output.compression.type = BLOCK; -- 默認BLOCK

 

// 輸出合併小文件

SET hive.merge.mapfiles = true; -- 默認true,在map-only任務結束時合併小文件

SET hive.merge.mapredfiles = true; -- 默認false,在map-reduce任務結束時合併小文件

SET hive.merge.size.per.task = 268435456; -- 默認256M

SET hive.merge.smallfiles.avgsize = 16777216; -- 當輸出文件的平均大小小於該值時,啓動一個獨立的map-reduce任務進行文件merge

 

// 設置map和reduce數量

SET mapred.max.split.size = 256000000;

SET mapred.min.split.size = 64000000;

SET mapred.min.split.size.per.node = 64000000;

SET mapred.min.split.size.per.rack = 64000000;

SET hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

SET hive.exec.reducers.bytes.per.reducer = 256000000; -- 默認64M,每一個reducer處理的數據量大小

 

// 設置數據傾斜和並行化

SET hive.exec.parallel = true; -- 並行執行

SET hive.exec.parallel.thread.number = 16;

SET mapred.job.reuse.jvm.num.tasks = 10;

SET hive.exec.dynamic.partition = true;

SET hive.optimize.cp = true; -- 列裁剪

SET hive.optimize.pruner = true; -- 分區裁剪

SET hive.groupby.skewindata = true; -- groupby數據傾斜

SET hive.exec.mode.local.auto = true; --本地執行

SET hive.exec.mode.local.auto.input.files.max = 10; //map數默認是4,當map數小於10就會啓動任務本地執行

SET hive.exec.mode.local.auto.inputbytes.max = 128000000 --默認是128M

 

//關閉如下兩個參數來完成關閉Hive任務的推測執行

SET mapred.map.tasks.speculative.execution=false;

SET mapred.reduce.tasks.speculative.execution=false;

相關文章
相關標籤/搜索