一篇文章讓你通透Hive優化

1、hive參數優化

一、map數優化

mapred.min.split.size: 指的是數據的最小分割單元大小;min的默認值是1B
mapred.max.split.size: 指的是數據的最大分割單元大小;max的默認值是256MB
經過調整max能夠起到調整map數的做用,減少max能夠增長map數,增大max能夠減小map數。
須要提醒的是,直接調整mapred.map.tasks這個參數是沒有效果的。


node

舉例:python

  a) 假設input目錄下有1個文件a,大小爲780M,那麼hadoop會將該文件a分隔成7個塊(6個128M的塊和1個12M的塊),從而產生7個map書;算法

  b) 假設input目錄下有3個文件a,b,c,大小分別爲10M,20M,130M,那麼hadoop會分隔成4個塊(10M,20M,128M,2M),從而產生4個map數;sql

  注意:若是文件大於塊大小(128M),那麼會拆分,若是小於塊大小,則把該文件當成一個塊。shell

  其實這就涉及到小文件的問題:若是一個任務有不少小文件(遠遠小於塊大小128M),則每一個小文件也會當作一個塊,用一個map任務來完成。數據庫

  而一個map任務啓動和初始化的時間遠遠大於邏輯處理的時間,就會形成很大的資源浪費。並且,同時可執行的map數是受限的。那麼,是否是保證每一個map處理接近128M的文件塊,就高枕無憂了?答案也是不必定。好比有一個127M的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,若是map處理的邏輯比較複雜,用一個map任務去作,確定也比較耗時。apache

  咱們該如何去解決呢???api

  咱們須要採起兩種方式來解決:即減小map數和增長map數。網絡

  一、減小map數量併發

假設一個SQL任務:
Select count(1) from popt_tbaccountcopy_meswhere pt = '2012-07-04';
該任務的inputdir :  /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04
共有194個文件,其中不少事遠遠小於128M的小文件,總大小9G,正常執行會用194個map任務。
Map總共消耗的計算資源:SLOTS_MILLIS_MAPS= 623,020

經過如下方法來在map執行前合併小文件,減小map數:
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
再執行上面的語句,用了74個map任務,map消耗的計算資源:SLOTS_MILLIS_MAPS= 333,500
對於這個簡單SQL任務,執行時間上可能差很少,但節省了一半的計算資源。
大概解釋一下,100000000表示100M, 
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;這個參數表示執行前進行小文件合併,
前面三個參數肯定合併文件塊的大小,大於文件塊大小128m的,按照128m來分隔,
小於128m,大於100m的,按照100m來分隔,把那些小於100m的(包括小文件和分隔大文件剩下的),
進行合併,最終生成了74個塊。

二、增大map數量

  如何適當的增長map數?
  當input的文件都很大,任務邏輯複雜,map執行很是慢的時候,能夠考慮增長Map數,來使得每一個map處理的數據量減小,從而提升任務的執行效率。

假設有這樣一個任務:
    Select data_desc,
               count(1),
               count(distinct id),
               sum(case when ...),
               sum(case when ...),
               sum(...)
    from a group by data_desc

  若是表a只有一個文件,大小爲120M,但包含幾千萬的記錄,若是用1個map去完成這個任務,確定是比較耗時的,這種狀況下,咱們要考慮將這一個文件合理的拆分紅多個,這樣就能夠用多個map任務去完成。

set mapred.reduce.tasks=10;
  create table a_1 as 
  select * from a 
  distribute by rand(123);

  這樣會將a表的記錄,隨機的分散到包含10個文件的a_1表中,再用a_1代替上面sql中的a表,則會用10個map任務去完成。每一個map任務處理大於12M(幾百萬記錄)的數據,效率確定會好不少。

二、reduce數優化

  Reduce的個數對整個做業的運行性能有很大影響。若是Reduce設置的過大,那麼將會產生不少小文件,對NameNode會產生必定的影響,並且整個做業的運行時間未必會減小;若是Reduce設置的太小,那麼單個Reduce處理的數據將會加大,極可能會引發OOM異常。

  若是設置了mapred.reduce.tasks/mapreduce.job.reduces參數,那麼Hive會直接使用它的值做爲Reduce的個數;若是mapred.reduce.tasks/mapreduce.job.reduces的值沒有設置(也就是-1),那麼Hive會根據輸入文件的大小估算出Reduce的個數。根據輸入文件估算Reduce的個數可能未必很準確,由於Reduce的輸入是Map的輸出,而Map的輸出可能會比輸入要小,因此最準確的數根據Map的輸出估算Reduce的個數。

本身如何肯定reduce數:

  reduce個數的設定極大影響任務執行效率,不指定reduce個數的狀況下,Hive會猜想肯定一個reduce個數,基於如下兩個設定:

hive.exec.reducers.bytes.per.reducer(每一個reduce任務處理的數據量,默認爲1000^3=1G)

hive.exec.reducers.max(每一個任務最大的reduce數,默認爲1009

  計算reducer數的公式很簡單N=min(參數2,總輸入數據量/參數1)。即,若是reduce的輸入(map的輸出)總大小不超過1G,那麼只會有一個reduce任務。

如:select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt;
  總大小爲9G多
  所以這句有10個reduce

  1. 調整reduce個數方法一

(1)每一個Reduce處理的數據量默認是256MB
hive.exec.reducers.bytes.per.reducer=256123456
(2)每一個任務最大的reduce數,默認爲1009
hive.exec.reducers.max=1009


  1. 調整reduce個數方法二

在hadoop的mapred-default.xml文件中修改
設置每一個job的Reduce個數
set mapreduce.job.reduces = 15;

  1. reduce個數並非越多越好

   1)過多的啓動和初始化reduce也會消耗時間和資源;

   2)另外,有多少個reduce,就會有多少個輸出文件,若是生成了不少個小文件,那麼若是這些小文件做爲下一個任務的輸入,則也會出現小文件過多的問題;

  注意:在設置reduce個數的時候也須要考慮這兩個原則:使大數據量利用合適的reduce數;是單個reduce任務處理合適的數據量。

三、Fetch抓取(Hive能夠避免進行MapReduce)

  Hive中對某些狀況的查詢能夠沒必要使用MapReduce計算。例如:SELECT * FROM employees;在這種狀況下,Hive能夠簡單地讀取employee對應的存儲目錄下的文件,而後輸出查詢結果到控制檯。
  在hive-default.xml.template文件中hive.fetch.task.conversion默認是more,老版本hive默認是minimal,該屬性修改成more之後,在全局查找、字段查找、limit查找等都不走MapReduce。

案例實操

  • 1)把hive.fetch.task.conversion設置成none,而後執行查詢語句,都會執行mapreduce程序。
hive (default)> set hive.fetch.task.conversion=none;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;
  • 2)把hive.fetch.task.conversion設置成more,而後執行查詢語句,以下查詢方式都不會執行mapreduce程序。
hive (default)> set hive.fetch.task.conversion=more;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;

4 、模式選擇

 1)本地模式

  大多數的Hadoop job是須要Hadoop提供的完整的可擴展性來處理大數據集的。不過,有時Hive的輸入數據量是很是小的。在這種狀況下,爲查詢觸發執行任務時消耗可能會比實際job的執行時間要多的多。對於大多數這種狀況,Hive能夠經過本地模式在單臺機器上處理全部的任務。對於小數據集,執行時間能夠明顯被縮短。
  用戶能夠經過設置hive.exec.mode.local.auto的值爲true,來讓Hive在適當的時候自動啓動這個優化。本地模式涉及到三個參數:

參數名 默認值 備註
hive.exec.mode.local.auto false 讓hive決定是否在本地模式自動運行
hive.exec.mode.local.auto.input.files.max 4 不啓動本地模式的task最大個數
hive.exec.mode.local.auto.inputbytes.max 128M 不啓動本地模式的最大輸入文件大小
set hive.exec.mode.local.auto=true;  //開啓本地mr
//設置local mr的最大輸入數據量,當輸入數據量小於這個值時採用local mr的方式,默認爲134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//設置local mr的最大輸入文件個數,當輸入文件個數小於這個值時採用local mr的方式,默認爲4
set hive.exec.mode.local.auto.input.files.max=10;

案例實操:

1)開啓本地模式,並執行查詢語句
hive (default)> set hive.exec.mode.local.auto=true; 
hive (default)> select * from score cluster by s_id;
18 rows selected (1.568 seconds)
2)關閉本地模式,並執行查詢語句
hive (default)> set hive.exec.mode.local.auto=false; 
hive (default)> select * from score cluster by s_id;
18 rows selected (11.865 seconds)

 2)並行模式

  Hive會將一個查詢轉化成一個或多個階段。這樣的階段能夠是MapReduce階段、抽樣階段、合併階段、limit階段。默認狀況下,Hive一次只會執行一個階段,因爲job包含多個階段,而這些階段並不是徹底相互依賴,即:這些階段能夠並行執行,能夠縮短整個job的執行時間。設置參數,set hive.exec.parallel=true或者經過配置文件來完成。

 3)嚴格模式

  Hive提供一個嚴格模式,能夠防止用戶執行那些可能產生意想不到的影響查詢,經過設置hive.mapred.mode值爲strict來完成。默認是非嚴格模式nonstrict。

  1)對於分區表,除非where語句中含有分區字段過濾條件來限制範圍,不然不容許執行。換句話說,就是用戶不容許掃描全部分區。進行這個限制的緣由是,一般分區表都擁有很是大的數據集,並且數據增長迅速。沒有進行分區限制的查詢可能會消耗使人不可接受的巨大資源來處理這個表。

  2)對於使用了order by語句的查詢,要求必須使用limit語句。由於order by爲了執行排序過程會將全部的結果數據分發到同一個Reducer中進行處理,強制要求用戶增長這個LIMIT語句能夠防止Reducer額外執行很長一段時間。

  3)限制笛卡爾積的查詢。對關係型數據庫很是瞭解的用戶可能指望在執行JOIN查詢的時候不使用ON語句而是使用where語句,這樣關係數據庫的執行優化器就能夠高效地將WHERE語句轉化成那個ON語句。不幸的是,Hive並不會執行這種優化,所以,若是表足夠大,那麼這個查詢就會出現不可控的狀況。

五、JVM重用

  JVM重用是Hadoop調優參數的內容,其對Hive的性能具備很是大的影響,特別是對於很難避免小文件的場景或task特別多的場景,這類場景大多數執行時間都很短。

  JVM重用可使得JVM實例在同一個job中從新使用N次。N的值能夠在Hadoop的mapred-site.xml文件中進行配置。一般在10-20之間,具體多少須要根據具體業務場景測試得出。

<property>
  <name>mapreduce.job.jvm.numtasks</name>
  <value>10</value>
  <description>How many tasks to run per jvm. If set to -1, there is
  no limit. 
  </description>
</property>

  咱們也能夠在hive當中經過

set  mapred.job.reuse.jvm.num.tasks=10;

  這個設置來設置咱們的jvm重用。

缺點

  開啓JVM重用將一直佔用使用到的task插槽,直到任務完成後才能釋放。若是某個「不平衡的」job中有某幾個Reduce task執行的時間要比其餘Reduce task消耗的時間多的多的話,那麼保留的插槽就會一直空閒着卻沒法被其餘的job使用,直到全部的task都結束了纔會釋放。

六、推測執行

  在分佈式集羣環境下,由於程序Bug(包括Hadoop自己的bug),負載不均衡或者資源分佈不均等緣由,會形成同一個做業的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢於其餘任務(好比一個做業的某個任務進度只有50%,而其餘全部任務已經運行完畢),則這些任務會拖慢做業的總體執行進度。爲了不這種狀況發生,Hadoop採用了推測執行(Speculative Execution)機制,它根據必定的法則推測出「拖後腿」的任務,併爲這樣的任務啓動一個備份任務,讓該任務與原始任務同時處理同一份數據,並最終選用最早成功運行完成任務的計算結果做爲最終結果。
hadoop的推測執行功能由mapred-site.xml文件中的2個參數決定:

<property>
	<name> mapred.map.tasks.speculative.execution </name>
	<value>true</value>
</property>

<property>
	<name> mapred.reduce.tasks.speculative.execution</name>
	<value>true</value>
</property>

hive自己也有控制推測執行的參數,能夠在hive-site.xml文件中配置:

<property>
	<name>hive.mapred.reduce.tasks.speculative.execution </name>
	<value>true</value>
</property>

hive中推測執行參數默認值以下:

hive (default)> set mapred.map.tasks.speculative.execution;

mapred.map.tasks.speculative.execution=true

hive (default)> set mapred.reduce.tasks.speculative.execution;

mapred.reduce.tasks.speculative.execution=true

hive (default)> set hive.mapred.reduce.tasks.speculative.execution;

hive.mapred.reduce.tasks.speculative.execution=true

  關於調優這些推測執行變量,還很難給一個具體的建議。若是用戶對於運行時的誤差很是敏感的話,那麼能夠將這些功能關閉掉。若是用戶由於輸入數據量很大而須要執行長時間的map task或者reduce task的話,那麼啓動推測執行形成的浪費是很是巨大大。

七、並行執行

  Hive會將一個查詢轉化成一個或者多個階段。例如:MapReduce階段、抽樣階段、合併階段、limit階段。或者Hive執行過程當中可能須要的其餘階段。默認狀況下,Hive一次只會執行一個階段。不過,某個特定的job可能包含衆多的階段,而這些階段可能並不是徹底互相依賴的,也就是說有些階段是能夠並行執行的,這樣可能使得整個job的執行時間縮短。不過,若是有更多的階段能夠並行執行,那麼job可能就越快完成。

// 開啓任務並行執行
 set hive.exec.parallel=true;
// 同一個sql容許並行任務的最大線程數 
set hive.exec.parallel.thread.number=8;

八、合併小文件

  小文件的產生有三個地方,map輸入,map輸出,reduce輸出,小文件過多也會影響hive的分析效率:

設置map輸入的小文件合併

set mapred.max.split.size=256000000;  
//一個節點上split的至少的大小(這個值決定了多個DataNode上的文件是否須要合併)
set mapred.min.split.size.per.node=100000000;
//一個交換機下split的至少的大小(這個值決定了多個交換機上的文件是否須要合併) 
set mapred.min.split.size.per.rack=100000000;
//執行Map前進行小文件合併
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

設置map輸出和reduce輸出進行合併的相關參數:

//設置map端輸出進行合併,默認爲true
set hive.merge.mapfiles = true
//設置reduce端輸出進行合併,默認爲false
set hive.merge.mapredfiles = true
//設置合併文件的大小
set hive.merge.size.per.task = 256*1000*1000
//當輸出文件的平均大小小於該值時,啓動一個獨立的MapReduce任務進行文件merge。
set hive.merge.smallfiles.avgsize=16000000

2、hive壓縮存儲優化

一、壓縮優化

 1)壓縮緣由

  hive最終是轉爲MapReduce程序來執行的,而MapReduce的性能瓶頸在於網絡IO和磁盤IO,要解決性能瓶頸,最主要的是減小數據量,對數據進行壓縮是個好的方式。可是在壓縮和解壓過程當中會增長CPU的開銷。不過每每性能瓶頸不在於CPU,因此針對IO密集型的jobs(非計算密集型)可使用壓縮的方式提升性能

 2)壓縮方式

在這裏插入圖片描述  各個壓縮方式所對應的 Class 類:
在這裏插入圖片描述

 3)壓縮方式的選擇

壓縮率
壓縮解壓縮速度
是否支持spllit

 4)壓縮的使用

Job輸出文件按照block以Gzip的方式進行壓縮:

set mapreduce.output.fileoutputformat.compress=true // 默認值是 false
set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默認值是 Record
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec 
// 默認值是 org.apache.hadoop.io.compress.DefaultCodec

Map輸出結果也以Gzip進行壓縮:

set mapred.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec 

對Hive輸出結果和中間都進行壓縮:

set hive.exec.compress.output=true // 默認值是 false,不壓縮
set hive.exec.compress.intermediate=true // 默認值是 false,爲 true 時 MR 設置的壓縮才啓用

二、存儲優化

  可使用列裁剪,分區裁剪,orc,parquet等這些列式存儲格式,由於列式存儲的表,每一列的數據在物理上是存儲在一塊兒的,Hive查詢時會只遍歷須要列數據,大大減小處理的數據量。

 1)存儲格式

  1. TextFile

  Hive數據表的默認格式,存儲方式:行存儲。
  可使用Gzip壓縮算法,但壓縮後的文件不支持split 在反序列化過程當中,必須逐個字符判斷是否是分隔符和行結束符,所以反序列化開銷會比SequenceFile高几十倍。

  1. Sequence Files

  Hadoop中有些原生壓縮文件的缺點之一就是不支持分割。支持分割的文件能夠並行的有多個mapper程序處理大數據文件,大多數文件不支持可分割是由於這些文件只能從頭開始讀。Sequence File是可分割的文件格式,支持Hadoop的block級壓縮。 Hadoop API提供的一種二進制文件,以key-value的形式序列化到文件中。存儲方式:行存儲。 sequencefile支持三種壓縮選擇:NONE,RECORD,BLOCK。Record壓縮率低,RECORD是默認選項,一般BLOCK會帶來較RECORD更好的壓縮性能。 優點是文件和hadoop api中的MapFile是相互兼容的。

  1. RCFile

  存儲方式:數據按行分塊,每塊按列存儲。結合了行存儲和列存儲的優勢:
  首先,RCFile 保證同一行的數據位於同一節點,所以元組重構的開銷很低 其次,像列存儲同樣,RCFile 可以利用列維度的數據壓縮,而且能跳過沒必要要的列讀取 數據追加:RCFile不支持任意方式的數據寫操做,僅提供一種追加接口,這是由於底層的 HDFS當前僅僅支持數據追加寫文件尾部。 行組大小:行組變大有助於提升數據壓縮的效率,可是可能會損害數據的讀取性能,由於這樣增長了 Lazy 解壓性能的消耗。並且行組變大會佔用更多的內存,這會影響併發執行的其餘MR做業。

  1. ORCFile

  存儲方式:數據按行分塊,每塊按照列存儲。
  壓縮快,快速列存取。效率比rcfile高,是rcfile的改良版本。

  1. Parquet

  Parquet也是一種行式存儲,同時具備很好的壓縮性能;同時能夠減小大量的表掃描和反序列化的時間。

  1. 自定義格式

  能夠自定義文件格式,用戶可經過實現InputFormat和OutputFormat來自定義輸入輸出格式。

結論:通常選擇ORCFile/parquet + snappy 的方式

create table tablename (
 xxx,string
 xxx, bigint
)
ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress" = "SNAPPY")

3、hive表設計優化

一、內部表 & 外部表

區別:
  (1)建立表時指定external關鍵字,就是外部表,不指定external就是內部表(也稱管理表)。

  (2)內部表刪除後把元數據和數據都刪除了,外部表刪除後只是刪除了元數據,不會刪除hdfs上的數據文件。

  (3)外部表建立表時經過location指定存放表數據的hdfs上的路徑,而內部表是默認存放在hive-site.xml中。

二、分區

分區主要用於提升性能

  1.分區列的值將表劃分爲segments(文件夾)

  2.查詢時使用「分區」列和常規列相似

  3.查詢時Hive自動過濾掉不用於提升性能的分區

 1)靜態分區

建表時經過PARTITIONED BY定義分區

CREATE TABLE employee_partitioned(
    name string,
    work_place ARRAY<string>,
    sex_age STRUCT<sex:string,age:int>,
    skills_score MAP<string,int>,
    depart_title MAP<STRING,ARRAY<STRING>> )
PARTITIONED BY (year INT, month INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':';

ALTER TABLE的方式添加靜態分區,ADD添加分區, DROP刪除分區

ALTER TABLE employee_partitioned ADD 
PARTITION (year=2019,month=3) PARTITION (year=2019,month=4); 
ALTER TABLE employee_partitioned DROP PARTITION (year=2019, month=4);

 2)動態分區

使用動態分區需設定屬性

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

動態分區設置方法

insert into table employee_partitioned partition(year, month)
select name,array('Toronto') as work_place,
named_struct("sex","male","age",30) as sex_age,
map("python",90) as skills_score,
map("r&d", array('developer')) as depart_title,
year(start_date) as year,month(start_date) as month
from employee_hr eh ;

  默認狀況下,用戶必須指定至少一個靜態分區列。這是爲了不意外地覆蓋分區。
  使用動態分區需禁用此限制,能夠將分區模式從默認的嚴格模式設置爲非嚴格模式

三、分桶

  桶是更細粒度的劃分, 相同的數據分到一個桶裏面,減小數據訪問的量,對每個表或者分區,hive能夠進行進一步的分桶。
  1.分桶對應於HDFS中的文件
  2.更高的查詢處理效率
  3.使抽樣(sampling)更高效
  4.根據「桶列」的哈希函數將數據進行分桶
  5.分桶只有動態分桶




SET hive.enforce.bucketing = true;

定義分桶

CLUSTERED BY (employee_id) INTO 2 BUCKETS  //分桶的列employee_id是表中已有的列

  1.分桶數最好是2的n次方
  2.必須使用INSERT方式加載數據到設置分桶的表,纔會生效
  3.與分區不一樣,分桶列名出如今列定義中

4、SQL優化

一、小表join大表

  新的版本當中已經沒有區別了,舊的版本當中須要使用小表。
  在使用寫有join操做的查詢語句時有一條原則:應該將條目少的表/子查詢放在join操做符的左邊。緣由是在Join操做的Reduce階段,位於join操做符左邊的表的內容會被加載進內存,將條目少的表放在左邊,能夠有效減小發生OOM錯誤的概率;再進一步,可使用Group讓小的維度表(1000條如下的記錄條數)先進內存。在map端完成reduce。
  多個表關聯時,最好分拆成小段,避免大sql(沒法控制中間Job)。

二、大表join大表

 1)空key過濾

有時join超時是由於某些key對應的數據太多,而相同key對應的數據都會發送到相同的reducer上,從而致使內存不夠。此時咱們應該仔細分析這些異常的key,不少狀況下,這些key對應的數據是異常數據,咱們須要在SQL語句中進行過濾。例如key對應的字段爲空。對好比下:

  • 不過濾
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
結果:
No rows affected (152.135 seconds)
  • 過濾
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
結果:
No rows affected (141.585 seconds)

 2)空key轉換

  問題:日誌中常會出現信息丟失,好比每日約爲20億的全網日誌,其中的user_id爲主鍵,在日誌收集過程當中會丟失,出現主鍵爲null的狀況,若是取其中的user_id和bmw_users關聯,就會碰到數據傾斜的問題。緣由是Hive中,主鍵爲null值的項會被當作相同的Key而分配進同一個計算Map。

解決方法1:user_id爲空的不參與關聯,子查詢過濾null

SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL

解決方法2:函數過濾null

SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive', RAND()) ELSE a.user_id END = b.user_id;

  調優結果:原先因爲數據傾斜致使運行時長超過1小時,解決方法1運行每日平均時長25分鐘,解決方法2運行的每日平均時長在20分鐘左右。優化效果很明顯。

  解決方法2比解決方法1效果更好,不但IO少了,並且做業數也少了。解決方法1中log讀取兩次,job數爲2。解決方法2中job數是1。這個優化適合無效id(好比-99,‘’,null等)產生的傾斜問題。把空值的key變成一個字符串加上隨機數,就能把傾斜的數據分到不一樣的Reduce上,從而解決數據傾斜問題。由於空值不參與關聯,即便分到不一樣的Reduce上,也不會影響最終的結果。附上Hadoop通用關聯的實現方法是:關聯經過二次排序實現的,關聯的列爲partition key,關聯的列和表的tag組成排序的group key,根據partition key分配Reduce。同一Reduce內根據group key排序。

三、mapjoin

  若是不指定MapJoin或者不符合MapJoin的條件,那麼Hive解析器會將Join操做轉換成Common Join,即:在Reduce階段完成join。容易發生數據傾斜。能夠用MapJoin把小表所有加載到內存在map端進行join,避免reducer處理。

  • 開啓MapJoin參數設置:
1)設置自動選擇Mapjoin

set hive.auto.convert.join = true; 默認爲true2)大表小表的閾值設置(默認25M如下認爲是小表):

set hive.mapjoin.smalltable.filesize=25123456;

四、group by

  默認狀況下,Map階段同一Key數據分發給一個reduce,當一個key數據過大時就傾斜了。
  並非全部的聚合操做都須要在Reduce端完成,不少聚合操做均可以先在Map端進行部分聚合,最後在Reduce端得出最終結果。

  • 開啓Map端聚合參數設置
1)是否在Map端進行聚合,默認爲True

   set hive.map.aggr = true;2)在Map端進行聚合操做的條目數目

   set hive.groupby.mapaggr.checkinterval = 100000;3)有數據傾斜的時候進行負載均衡(默認是falseset hive.groupby.skewindata = true;

  當選項設定爲 true,生成的查詢計劃會有兩個MapReduce Job。
  第一個MapReduce Job中,Map的輸出結果會隨機分佈到Reduce中,每一個Reduce作部分聚合操做,並輸出結果,這樣處理的結果是相同的Group By Key有可能被分發到不一樣的Reduce中,從而達到負載均衡的目的;
  第二個MapReduce Job再根據預處理的數據結果按照Group By Key分佈到Reduce中(這個過程能夠保證相同的Group By Key被分佈到同一個Reduce中),最後完成最終的聚合操做。

五、笛卡爾積

  儘可能避免笛卡爾積,join的時候不加on條件,或者無效的on條件,Hive只能使用1個reducer來完成笛卡爾積。
  當 Hive 設定爲嚴格模式(hive.mapred.mode=strict)時,不容許在 HQL 語句中出現笛卡爾積, 這實際說明了 Hive 對笛卡爾積支持較弱。由於找不到 Join key,Hive 只能使用 1 個 reducer 來完成笛卡爾積。
  固然也可使用 limit 的辦法來減小某個表參與 join 的數據量,但對於須要笛卡爾積語義的需求來講,常常是一個大表和一個小表的 Join 操做,結果仍然很大(以致於沒法用單機處理),這時 MapJoin纔是最好的解決辦法。MapJoin,顧名思義,會在 Map 端完成 Join 操做。 這須要將 Join 操做的一個或多個表徹底讀入內存。
  MapJoin 在子查詢中可能出現未知 BUG。在大表和小表作笛卡爾積時,規避笛卡爾積的方法是:給 Join 添加一個 Join key,原理很簡單:將小表擴充一列 join key,並將小表的條目複製數倍,join key 各不相同;將大表擴充一列 join key 爲隨機數。精髓就在於複製幾倍,最後就有幾個 reduce 來作,並且大表的數據是前面小表擴張 key 值 範圍裏面隨機出來的,因此複製了幾倍 n,就至關於這個隨機範圍就有多大 n,那麼相應的, 大表的數據就被隨機的分爲了 n 份。而且最後處理所用的 reduce 數量也是 n,並且也不會出現數據傾斜。


六、count(distinct)去重統計

  數據量小的時候無所謂,數據量大的狀況下,因爲COUNT DISTINCT操做須要用一個Reduce Task來完成,這一個Reduce須要處理的數據量太大,容易產生傾斜問題,就會致使整個Job很難完成。通常COUNT DISTINCT使用先GROUP BY再COUNT的方式替換。

七、行列過濾

一、列處理

  在 SELECT 中,只拿須要的列,若是有,儘可能使用分區過濾,少用 SELECT *。
二、行處理

  在分區剪裁中,當使用外關聯時,過濾條件最好用在子查詢裏。

(1)先關聯兩張表,再用 where 條件過濾 (不推薦)

select o.id from bigtable b join ori o on o.id = b.id where o.id <= 10;

(2)優化後,經過子查詢後,再關聯表 (推薦)

select b.id from bigtable b join (select id from ori where id <= 10 ) o on b.id = o.id;

八、排序選擇

cluster by:對同一字段分桶並排序,不能和 sort by 連用。

distribute by + sort by:分桶,保證同一字段值只存在一個結果文件當中,結合 sort by 保證 每一個 reduceTask 結果有序。

sort by:單機排序,單個 reduce 結果有序。

order by:全局排序,缺陷是隻能使用一個 reduce。

九、查看sql的執行計劃(Explain)

1)基本語法

Explain [extended | dependency | authorization] query

2)案例實操

(1)查看下面這條語句的執行計劃

hive (default)> explain select * from emp;

(2)查看詳細執行計劃

hive (default)> explain extended select * from emp;

5、數據傾斜

  表現:任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少許(1個或幾個)reduce子任務未完成。由於其處理的數據量和其餘reduce差別過大。

  緣由:某個reduce的數據輸入量遠遠大於其餘reduce數據的輸入量

1)、key分佈不均勻

2)、業務數據自己的特性

3)、建表時考慮不周

4)、某些SQL語句自己就有數據傾斜

關鍵詞 情形 後果
join 其中一個表較小,可是key集中 分發到某一個或幾個Reduce上的數據遠高於平均值
join 大表與大表,可是分桶的判斷字段0值或空值過多 這些空值都由一個reduce處理,很是慢
group by group by 維度太小,某值的數量過多 處理某值的reduce很是耗時
count distinct 某特殊值過多 處理此特殊值reduce耗時

解決方案:

(1)參數調節

set hive.map.aggr=true
set hive.groupby.skewindata=true

(2) 熟悉數據的分佈,優化sql的邏輯,找出數據傾斜的緣由。

相關文章
相關標籤/搜索