從0開始學大數據-Hive性能優化篇

在工做中使用hive比較多,也寫了不少HiveQL。這裏從三個方面對 Hive 經常使用的一些性能優化進行了總結。算法

表設計層面優化

利用分區表優化

分區表 是在某一個或者幾個維度上對數據進行分類存儲,一個分區對應一個目錄。若是篩選條件裏有分區字段,那麼 Hive 只須要遍歷對應分區目錄下的文件便可,不須要遍歷全局數據,使得處理的數據量大大減小,從而提升查詢效率。sql

當一個 Hive 表的查詢大多數狀況下,會根據某一個字段進行篩選時,那麼很是適合建立爲分區表。apache

利用桶表優化

指定桶的個數後,存儲數據時,根據某一個字段進行哈希後,肯定存儲在哪一個桶裏,這樣作的目的和分區表相似,也是使得篩選時不用全局遍歷全部的數據,只須要遍歷所在桶就能夠了。性能優化

選擇合適的文件存儲格式

Apache Hive 支持 Apache Hadoop 中使用的幾種熟悉的文件格式。網絡

TextFile 默認格式,若是建表時不指定默認爲此格式。架構

存儲方式:行存儲。併發

每一行都是一條記錄,每行都以換行符\n結尾。數據不作壓縮時,磁盤會開銷比較大,數據解析開銷也比較大。app

可結合 GzipBzip2 等壓縮方式一塊兒使用(系統會自動檢查,查詢時會自動解壓),但對於某些壓縮算法 hive 不會對數據進行切分,從而沒法對數據進行並行操做。負載均衡

SequenceFilejvm

一種Hadoop API 提供的二進制文件,使用方便、可分割、個壓縮的特色。

支持三種壓縮選擇:NONE、RECORD、BLOCK。RECORD壓縮率低,通常建議使用BLOCK壓縮。

RCFile

存儲方式:數據按行分塊,每塊按照列存儲 。

  • 首先,將數據按行分塊,保證同一個record在一個塊上,避免讀一個記錄須要讀取多個block。
  • 其次,塊數據列式存儲,有利於數據壓縮和快速的列存取。

ORC

存儲方式:數據按行分塊,每塊按照列存儲

Hive 提供的新格式,屬於 RCFile 的升級版,性能有大幅度提高,並且數據能夠壓縮存儲,壓縮快,快速列存取。

Parquet

存儲方式:列式存儲

Parquet 對於大型查詢的類型是高效的。對於掃描特定表格中的特定列查詢,Parquet特別有用。Parquet通常使用 Snappy、Gzip 壓縮。默認 Snappy。

Parquet 支持 Impala 查詢引擎。

表的文件存儲格式儘可能採用 ParquetORC,不只下降存儲量,還優化了查詢,壓縮,表關聯等性能;

選擇合適的壓縮方式

Hive 語句最終是轉化爲 MapReduce 程序來執行的,而 MapReduce 的性能瓶頸在與 網絡IO磁盤IO,要解決性能瓶頸,最主要的是 減小數據量,對數據進行壓縮是個好方式。壓縮雖然是減小了數據量,可是壓縮過程要消耗CPU,可是在Hadoop中,每每性能瓶頸不在於CPU,CPU壓力並不大,因此壓縮充分利用了比較空閒的CPU。

經常使用壓縮算法對比

如何選擇壓縮方式

  1. 壓縮比率
  2. 壓縮解壓速度
  3. 是否支持split

支持分割的文件能夠並行的有多個 mapper 程序處理大數據文件,大多數文件不支持可分割是由於這些文件只能從頭開始讀。

語法和參數層面優化

列裁剪

Hive 在讀數據的時候,能夠只讀取查詢中所須要用到的列,而忽略其餘的列。這樣作能夠節省讀取開銷,中間表存儲開銷和數據整合開銷。 

set hive.optimize.cp = true; -- 列裁剪,取數只取查詢中須要用到的列,默認爲真
複製代碼

分區裁剪

在查詢的過程當中只選擇須要的分區,能夠減小讀入的分區數目,減小讀入的數據量。

set hive.optimize.pruner=true; // 默認爲true
複製代碼

合併小文件

Map 輸入合併

在執行 MapReduce 程序的時候,通常狀況是一個文件須要一個 mapper 來處理。可是若是數據源是大量的小文件,這樣豈不是會啓動大量的 mapper 任務,這樣會浪費大量資源。能夠將輸入的小文件進行合併,從而減小mapper任務數量。詳細分析

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端輸入、合併文件以後按照block的大小分割(默認)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端輸入,不合並
複製代碼

Map/Reduce輸出合併

大量的小文件會給 HDFS 帶來壓力,影響處理效率。能夠經過合併 Map 和 Reduce 的結果文件來消除影響。

set hive.merge.mapfiles=true;  -- 是否合併Map輸出文件, 默認值爲真
set hive.merge.mapredfiles=true; -- 是否合併Reduce 端輸出文件,默認值爲假
set hive.merge.size.per.task=25610001000; -- 合併文件的大小,默認值爲 256000000
複製代碼

合理控制 map/reduce 任務數量

合理控制 mapper 數量

減小 mapper 數能夠經過合併小文件來實現 增長 mapper 數能夠經過控制上一個 reduce

默認的 mapper 個數計算方式

輸入文件總大小:total_size     
hdfs 設置的數據塊大小:dfs_block_size       
default_mapper_num = total_size/dfs_block_size
複製代碼

MapReduce 中提供了以下參數來控制 map 任務個數:

set mapred.map.tasks=10;
複製代碼

從字面上看,貌似是能夠直接設置 mapper 個數的樣子,可是很遺憾不行,這個參數設置只有在大於default_mapper_num的時候,纔會生效。

那若是咱們須要減小 mapper 數量,可是文件大小是固定的,那該怎麼辦呢?

能夠經過mapred.min.split.size設置每一個任務處理的文件的大小,這個大小隻有在大於dfs_block_size的時候纔會生效

split_size=max(mapred.min.split.size, dfs_block_size)          
split_num=total_size/split_size                   
compute_map_num = min(split_num,  max(default_mapper_num, mapred.map.tasks))
複製代碼

這樣就能夠減小mapper數量了。

總結一下控制 mapper 個數的方法:

  • 若是想增長 mapper 個數,能夠設置mapred.map.tasks爲一個較大的值
  • 若是想減小 mapper 個數,能夠設置maperd.min.split.size爲一個較大的值
  • 若是輸入是大量小文件,想減小 mapper 個數,能夠經過設置hive.input.format合併小文件

若是想要調整 mapper 個數,在調整以前,須要肯定處理的文件大概大小以及文件的存在形式(是大量小文件,仍是單個大文件),而後再設置合適的參數。

合理控制reducer數量

若是 reducer 數量過多,一個 reducer 會產生一個結數量果文件,這樣就會生成不少小文件,那麼若是這些結果文件會做爲下一個 job 的輸入,則會出現小文件須要進行合併的問題,並且啓動和初始化 reducer 須要耗費和資源。

若是 reducer 數量過少,這樣一個 reducer 就須要處理大量的數據,而且還有可能會出現數據傾斜的問題,使得整個查詢耗時長。 默認狀況下,hive 分配的 reducer 個數由下列參數決定:

  • 參數1:hive.exec.reducers.bytes.per.reducer(默認1G)
  • 參數2:hive.exec.reducers.max(默認爲999)

reducer的計算公式爲:

N = min(參數2, 總輸入數據量/參數1)
複製代碼

能夠經過改變上述兩個參數的值來控制reducer的數量。 也能夠經過

set mapred.map.tasks=10; 
複製代碼

直接控制reducer個數,若是設置了該參數,上面兩個參數就會忽略。

Join優化

優先過濾數據

儘可能減小每一個階段的數據量,對於分區表能用上分區字段的儘可能使用,同時只選擇後面須要使用到的列,最大限度的減小參與 join 的數據量。

小表 join 大表原則

小表 join 大表的時應遵照小表 join 大表原則,緣由是 join 操做的 reduce 階段,位於 join 左邊的表內容會被加載進內存,將條目少的表放在左邊,能夠有效減小發生內存溢出的概率。join 中執行順序是從左到右生成 Job,應該保證連續查詢中的表的大小從左到右是依次增長的。

使用相同的鏈接鍵

在 hive 中,當對 3 個或更多張表進行 join 時,若是 on 條件使用相同字段,那麼它們會合併爲一個 MapReduce Job,利用這種特性,能夠將相同的 join on 的放入一個 job 來節省執行時間。

啓用 mapjoin

mapjoin 是將 join 雙方比較小的表直接分發到各個 map 進程的內存中,在 map 進程中進行 join 操做,這樣就不用進行 reduce 步驟,從而提升了速度。只有 join 操做才能啓用 mapjoin。

set hive.auto.convert.join = true; -- 是否根據輸入小表的大小,自動將reduce端的common join 轉化爲map join,將小表刷入內存中。
set hive.mapjoin.smalltable.filesize = 2500000; -- 刷入內存表的大小(字節)
set hive.mapjoin.maxsize=1000000;  -- Map Join所處理的最大的行數。超過此行數,Map Join進程會異常退出
複製代碼

儘可能原子操做

儘可能避免一個SQL包含複雜的邏輯,可使用中間表來完成複雜的邏輯。

桶表 mapjoin

當兩個分桶表 join 時,若是 join on的是分桶字段,小表的分桶數是大表的倍數時,能夠啓用 mapjoin 來提升效率。

set hive.optimize.bucketmapjoin = true; -- 啓用桶表 map join
複製代碼

Group By 優化

默認狀況下,Map階段同一個Key的數據會分發到一個Reduce上,當一個Key的數據過大時會產生 數據傾斜。進行group by操做時能夠從如下兩個方面進行優化:

1. Map端部分聚合

事實上並非全部的聚合操做都須要在 Reduce 部分進行,不少聚合操做均可以先在 Map 端進行部分聚合,而後在 Reduce 端的得出最終結果。

set hive.map.aggr=true; -- 開啓Map端聚合參數設置

set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端進行聚合操做的條目數目
複製代碼

2. 有數據傾斜時進行負載均衡

set hive.groupby.skewindata = true; -- 有數據傾斜的時候進行負載均衡(默認是false) 
複製代碼

當選項設定爲 true 時,生成的查詢計劃有兩個 MapReduce 任務。在第一個 MapReduce 任務中,map 的輸出結果會隨機分佈到 reduce 中,每一個 reduce 作部分聚合操做,並輸出結果,這樣處理的結果是相同的group by key有可能分發到不一樣的 reduce 中,從而達到負載均衡的目的;第二個 MapReduce 任務再根據預處理的數據結果按照group by key分佈到各個 reduce 中,最後完成最終的聚合操做。

Order By 優化

order by只能是在一個reduce進程中進行,因此若是對一個大數據集進行order by,會致使一個reduce進程中處理的數據至關大,形成查詢執行緩慢。

  • 在最終結果上進行order by,不要在中間的大數據集上進行排序。若是最終結果較少,能夠在一個reduce上進行排序時,那麼就在最後的結果集上進行order by
  • 若是是去排序後的前N條數據,可使用distribute bysort by在各個reduce上進行排序後前N條,而後再對各個reduce的結果集合合併後在一個reduce中全局排序,再取前N條,由於參與全局排序的order by的數據量最可能是reduce個數 * N,因此執行效率很高。

COUNT DISTINCT優化

-- 優化前(只有一個reduce,先去重再count負擔比較大):
select count(distinct id) from tablename;

-- 優化後(啓動兩個job,一個job負責子查詢(能夠有多個reduce),另外一個job負責count(1)):
select count(1) from (select distinct id from tablename) tmp;
複製代碼

一次讀取屢次插入

有些場景是從一張表讀取數據後,要屢次利用,這時可使用multi insert語法:

from sale_detail
  insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' ) 
  select shop_name, customer_id, total_price where .....
  insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
  select shop_name, customer_id, total_price where .....;
複製代碼

說明:

  • 通常狀況下,單個SQL中最多能夠寫128路輸出,超過128路,則報語法錯誤。
  • 在一個multi insert中:
    • 對於分區表,同一個目標分區不容許出現屢次。
    • 對於未分區表,該表不能出現屢次。
  • 對於同一張分區表的不一樣分區,不能同時有insert overwriteinsert into操做,不然報錯返回。

啓用壓縮

map 輸出壓縮

set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
複製代碼

中間數據壓縮

中間數據壓縮就是對 hive 查詢的多個 job 之間的數據進行壓縮。最好是選擇一個節省CPU耗時的壓縮方式。能夠採用snappy壓縮算法,該算法的壓縮和解壓效率都很是高。

set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
複製代碼

結果數據壓縮

最終的結果數據(Reducer輸出數據)也是能夠進行壓縮的,能夠選擇一個壓縮效果比較好的,能夠減小數據的大小和數據的磁盤讀寫時間; 注:經常使用的gzip,snappy壓縮算法是不支持並行處理的,若是數據源是gzip/snappy壓縮文件大文件,這樣只會有有個mapper來處理這個文件,會嚴重影響查詢效率。 因此若是結果數據須要做爲其餘查詢任務的數據源,能夠選擇支持splitable的LZO算法,這樣既能對結果文件進行壓縮,還能夠並行的處理,這樣就能夠大大的提升job執行的速度了。關於如何給Hadoop集羣安裝LZO壓縮庫能夠查看這篇文章

set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;  
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
複製代碼

Hadoop集羣支持一下算法:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.GzipCodec
  • org.apache.hadoop.io.compress.BZip2Codec
  • org.apache.hadoop.io.compress.DeflateCodec
  • org.apache.hadoop.io.compress.SnappyCodec
  • org.apache.hadoop.io.compress.Lz4Codec
  • com.hadoop.compression.lzo.LzoCodec
  • com.hadoop.compression.lzo.LzopCodec

Hive架構層面優化

啓用直接抓取

Hive 從 HDFS 中讀取數據,有兩種方式:啓用 MapReduce 讀取、直接抓取。

直接抓取數據比 MapReduce 方式讀取數據要快的多,可是隻有少數操做可使用直接抓取方式。

能夠經過hive.fetch.task.conversion參數來配置在什麼狀況下采用直接抓取方式:

  • minimal:只有 select * 、在分區字段上 where 過濾、有 limit 這三種場景下才啓用直接抓取方式。
  • more:在 selectwhere 篩選、limit 時,都啓用直接抓取方式。
set hive.fetch.task.conversion=more; -- 啓用fetch more模式
複製代碼

本地化執行

Hive 在集羣上查詢時,默認是在集羣上多臺機器上運行,須要多個機器進行協調運行,這種方式很好的解決了大數據量的查詢問題。可是在Hive查詢處理的數據量比較小的時候,其實沒有必要啓動分佈式模式去執行,由於以分佈式方式執行設計到跨網絡傳輸、多節點協調等,而且消耗資源。對於小數據集,能夠經過本地模式,在單臺機器上處理全部任務,執行時間明顯被縮短。

set hive.exec.mode.local.auto=true; -- 打開hive自動判斷是否啓動本地模式的開關
set hive.exec.mode.local.auto.input.files.max=4; -- map任務數最大值
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map輸入文件最大大小
複製代碼

JVM重用

Hive 語句最終會轉換爲一系列的 MapReduce 任務,每個MapReduce 任務是由一系列的Map Task 和 Reduce Task 組成的,默認狀況下,MapReduce 中一個 Map Task 或者 Reduce Task 就會啓動一個 JVM 進程,一個 Task 執行完畢後,JVM進程就會退出。這樣若是任務花費時間很短,又要屢次啓動 JVM 的狀況下,JVM的啓動時間會變成一個比較大的消耗,這時,能夠經過重用 JVM 來解決。

set mapred.job.reuse.jvm.num.tasks=5;
複製代碼

JVM也是有缺點的,開啓JVM重用會一直佔用使用到的 task 的插槽,以便進行重用,直到任務完成後纔會釋放。若是某個不平衡的job中有幾個 reduce task 執行的時間要比其餘的 reduce task 消耗的時間要多得多的話,那麼保留的插槽就會一直空閒卻沒法被其餘的 job 使用,直到全部的 task 都結束了纔會釋放。

並行執行

有的查詢語句,hive會將其轉化爲一個或多個階段,包括:MapReduce 階段、抽樣階段、合併階段、limit 階段等。默認狀況下,一次只執行一個階段。可是,若是某些階段不是互相依賴,是能夠並行執行的。多階段並行是比較耗系統資源的。

set hive.exec.parallel=true;  -- 能夠開啓併發執行。
set hive.exec.parallel.thread.number=16;  -- 同一個sql容許最大並行度,默認爲8。
複製代碼

推測執行

在分佈式集羣環境下,由於程序Bug(包括Hadoop自己的bug),負載不均衡或者資源分佈不均等緣由,會形成同一個做業的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢於其餘任務(好比一個做業的某個任務進度只有50%,而其餘全部任務已經運行完畢),則這些任務會拖慢做業的總體執行進度。爲了不這種狀況發生,Hadoop採用了推測執行(Speculative Execution)機制,它根據必定的法則推測出「拖後腿」的任務,併爲這樣的任務啓動一個備份任務,讓該任務與原始任務同時處理同一份數據,並最終選用最早成功運行完成任務的計算結果做爲最終結果。

set mapreduce.map.speculative=true;
set mapreduce.reduce.speculative=true;
複製代碼

建議:

若是用戶對於運行時的誤差很是敏感的話,那麼能夠將這些功能關閉掉。若是用戶由於輸入數據量很大而須要執行長時間的map或者Reduce task的話,那麼啓動推測執行形成的浪費是很是巨大大。



擴展閱讀

相關文章
相關標籤/搜索