在工做中使用hive比較多,也寫了不少HiveQL。這裏從三個方面對 Hive 經常使用的一些性能優化進行了總結。算法
分區表 是在某一個或者幾個維度上對數據進行分類存儲,一個分區對應一個目錄。若是篩選條件裏有分區字段,那麼 Hive 只須要遍歷對應分區目錄下的文件便可,不須要遍歷全局數據,使得處理的數據量大大減小,從而提升查詢效率。sql
當一個 Hive 表的查詢大多數狀況下,會根據某一個字段進行篩選時,那麼很是適合建立爲分區表。apache
指定桶的個數後,存儲數據時,根據某一個字段進行哈希後,肯定存儲在哪一個桶裏,這樣作的目的和分區表相似,也是使得篩選時不用全局遍歷全部的數據,只須要遍歷所在桶就能夠了。性能優化
Apache Hive 支持 Apache Hadoop 中使用的幾種熟悉的文件格式。網絡
TextFile 默認格式,若是建表時不指定默認爲此格式。架構
存儲方式:行存儲。併發
每一行都是一條記錄,每行都以換行符\n
結尾。數據不作壓縮時,磁盤會開銷比較大,數據解析開銷也比較大。app
可結合 Gzip、Bzip2 等壓縮方式一塊兒使用(系統會自動檢查,查詢時會自動解壓),但對於某些壓縮算法 hive 不會對數據進行切分,從而沒法對數據進行並行操做。負載均衡
SequenceFilejvm
一種Hadoop API 提供的二進制文件,使用方便、可分割、個壓縮的特色。
支持三種壓縮選擇:NONE、RECORD、BLOCK。RECORD壓縮率低,通常建議使用BLOCK壓縮。
RCFile
存儲方式:數據按行分塊,每塊按照列存儲 。
ORC
存儲方式:數據按行分塊,每塊按照列存儲
Hive 提供的新格式,屬於 RCFile 的升級版,性能有大幅度提高,並且數據能夠壓縮存儲,壓縮快,快速列存取。
Parquet
存儲方式:列式存儲
Parquet 對於大型查詢的類型是高效的。對於掃描特定表格中的特定列查詢,Parquet特別有用。Parquet通常使用 Snappy、Gzip 壓縮。默認 Snappy。
Parquet 支持 Impala 查詢引擎。
表的文件存儲格式儘可能採用 Parquet 或 ORC,不只下降存儲量,還優化了查詢,壓縮,表關聯等性能;
Hive 語句最終是轉化爲 MapReduce 程序來執行的,而 MapReduce 的性能瓶頸在與 網絡IO 和 磁盤IO,要解決性能瓶頸,最主要的是 減小數據量,對數據進行壓縮是個好方式。壓縮雖然是減小了數據量,可是壓縮過程要消耗CPU,可是在Hadoop中,每每性能瓶頸不在於CPU,CPU壓力並不大,因此壓縮充分利用了比較空閒的CPU。
經常使用壓縮算法對比
如何選擇壓縮方式
支持分割的文件能夠並行的有多個 mapper 程序處理大數據文件,大多數文件不支持可分割是由於這些文件只能從頭開始讀。
Hive 在讀數據的時候,能夠只讀取查詢中所須要用到的列,而忽略其餘的列。這樣作能夠節省讀取開銷,中間表存儲開銷和數據整合開銷。
set hive.optimize.cp = true; -- 列裁剪,取數只取查詢中須要用到的列,默認爲真
複製代碼
在查詢的過程當中只選擇須要的分區,能夠減小讀入的分區數目,減小讀入的數據量。
set hive.optimize.pruner=true; // 默認爲true
複製代碼
在執行 MapReduce 程序的時候,通常狀況是一個文件須要一個 mapper 來處理。可是若是數據源是大量的小文件,這樣豈不是會啓動大量的 mapper 任務,這樣會浪費大量資源。能夠將輸入的小文件進行合併,從而減小mapper任務數量。詳細分析
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端輸入、合併文件以後按照block的大小分割(默認)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端輸入,不合並
複製代碼
大量的小文件會給 HDFS 帶來壓力,影響處理效率。能夠經過合併 Map 和 Reduce 的結果文件來消除影響。
set hive.merge.mapfiles=true; -- 是否合併Map輸出文件, 默認值爲真
set hive.merge.mapredfiles=true; -- 是否合併Reduce 端輸出文件,默認值爲假
set hive.merge.size.per.task=25610001000; -- 合併文件的大小,默認值爲 256000000
複製代碼
減小 mapper 數能夠經過合併小文件來實現 增長 mapper 數能夠經過控制上一個 reduce
默認的 mapper 個數計算方式
輸入文件總大小:total_size
hdfs 設置的數據塊大小:dfs_block_size
default_mapper_num = total_size/dfs_block_size
複製代碼
MapReduce 中提供了以下參數來控制 map 任務個數:
set mapred.map.tasks=10;
複製代碼
從字面上看,貌似是能夠直接設置 mapper 個數的樣子,可是很遺憾不行,這個參數設置只有在大於default_mapper_num
的時候,纔會生效。
那若是咱們須要減小 mapper 數量,可是文件大小是固定的,那該怎麼辦呢?
能夠經過mapred.min.split.size
設置每一個任務處理的文件的大小,這個大小隻有在大於dfs_block_size
的時候纔會生效
split_size=max(mapred.min.split.size, dfs_block_size)
split_num=total_size/split_size
compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks))
複製代碼
這樣就能夠減小mapper數量了。
總結一下控制 mapper 個數的方法:
mapred.map.tasks
爲一個較大的值maperd.min.split.size
爲一個較大的值hive.input.format
合併小文件若是想要調整 mapper 個數,在調整以前,須要肯定處理的文件大概大小以及文件的存在形式(是大量小文件,仍是單個大文件),而後再設置合適的參數。
若是 reducer 數量過多,一個 reducer 會產生一個結數量果文件,這樣就會生成不少小文件,那麼若是這些結果文件會做爲下一個 job 的輸入,則會出現小文件須要進行合併的問題,並且啓動和初始化 reducer 須要耗費和資源。
若是 reducer 數量過少,這樣一個 reducer 就須要處理大量的數據,而且還有可能會出現數據傾斜的問題,使得整個查詢耗時長。 默認狀況下,hive 分配的 reducer 個數由下列參數決定:
hive.exec.reducers.bytes.per.reducer
(默認1G)hive.exec.reducers.max
(默認爲999)reducer的計算公式爲:
N = min(參數2, 總輸入數據量/參數1)
複製代碼
能夠經過改變上述兩個參數的值來控制reducer的數量。 也能夠經過
set mapred.map.tasks=10;
複製代碼
直接控制reducer個數,若是設置了該參數,上面兩個參數就會忽略。
儘可能減小每一個階段的數據量,對於分區表能用上分區字段的儘可能使用,同時只選擇後面須要使用到的列,最大限度的減小參與 join 的數據量。
小表 join 大表的時應遵照小表 join 大表原則,緣由是 join 操做的 reduce 階段,位於 join 左邊的表內容會被加載進內存,將條目少的表放在左邊,能夠有效減小發生內存溢出的概率。join 中執行順序是從左到右生成 Job,應該保證連續查詢中的表的大小從左到右是依次增長的。
在 hive 中,當對 3 個或更多張表進行 join 時,若是 on 條件使用相同字段,那麼它們會合併爲一個 MapReduce Job,利用這種特性,能夠將相同的 join on 的放入一個 job 來節省執行時間。
mapjoin 是將 join 雙方比較小的表直接分發到各個 map 進程的內存中,在 map 進程中進行 join 操做,這樣就不用進行 reduce 步驟,從而提升了速度。只有 join 操做才能啓用 mapjoin。
set hive.auto.convert.join = true; -- 是否根據輸入小表的大小,自動將reduce端的common join 轉化爲map join,將小表刷入內存中。
set hive.mapjoin.smalltable.filesize = 2500000; -- 刷入內存表的大小(字節)
set hive.mapjoin.maxsize=1000000; -- Map Join所處理的最大的行數。超過此行數,Map Join進程會異常退出
複製代碼
儘可能避免一個SQL包含複雜的邏輯,可使用中間表來完成複雜的邏輯。
當兩個分桶表 join 時,若是 join on的是分桶字段,小表的分桶數是大表的倍數時,能夠啓用 mapjoin 來提升效率。
set hive.optimize.bucketmapjoin = true; -- 啓用桶表 map join
複製代碼
默認狀況下,Map階段同一個Key的數據會分發到一個Reduce上,當一個Key的數據過大時會產生 數據傾斜。進行group by
操做時能夠從如下兩個方面進行優化:
1. Map端部分聚合
事實上並非全部的聚合操做都須要在 Reduce 部分進行,不少聚合操做均可以先在 Map 端進行部分聚合,而後在 Reduce 端的得出最終結果。
set hive.map.aggr=true; -- 開啓Map端聚合參數設置
set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端進行聚合操做的條目數目
複製代碼
2. 有數據傾斜時進行負載均衡
set hive.groupby.skewindata = true; -- 有數據傾斜的時候進行負載均衡(默認是false)
複製代碼
當選項設定爲 true 時,生成的查詢計劃有兩個 MapReduce 任務。在第一個 MapReduce 任務中,map 的輸出結果會隨機分佈到 reduce 中,每一個 reduce 作部分聚合操做,並輸出結果,這樣處理的結果是相同的group by key
有可能分發到不一樣的 reduce 中,從而達到負載均衡的目的;第二個 MapReduce 任務再根據預處理的數據結果按照group by key
分佈到各個 reduce 中,最後完成最終的聚合操做。
order by
只能是在一個reduce進程中進行,因此若是對一個大數據集進行order by
,會致使一個reduce進程中處理的數據至關大,形成查詢執行緩慢。
order by
,不要在中間的大數據集上進行排序。若是最終結果較少,能夠在一個reduce上進行排序時,那麼就在最後的結果集上進行order by
。distribute by
和sort by
在各個reduce上進行排序後前N條,而後再對各個reduce的結果集合合併後在一個reduce中全局排序,再取前N條,由於參與全局排序的order by
的數據量最可能是reduce個數 * N
,因此執行效率很高。-- 優化前(只有一個reduce,先去重再count負擔比較大):
select count(distinct id) from tablename;
-- 優化後(啓動兩個job,一個job負責子查詢(能夠有多個reduce),另外一個job負責count(1)):
select count(1) from (select distinct id from tablename) tmp;
複製代碼
有些場景是從一張表讀取數據後,要屢次利用,這時可使用multi insert
語法:
from sale_detail
insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' )
select shop_name, customer_id, total_price where .....
insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
select shop_name, customer_id, total_price where .....;
複製代碼
說明:
- 通常狀況下,單個SQL中最多能夠寫128路輸出,超過128路,則報語法錯誤。
- 在一個multi insert中:
- 對於分區表,同一個目標分區不容許出現屢次。
- 對於未分區表,該表不能出現屢次。
- 對於同一張分區表的不一樣分區,不能同時有
insert overwrite
和insert into
操做,不然報錯返回。
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
複製代碼
中間數據壓縮就是對 hive 查詢的多個 job 之間的數據進行壓縮。最好是選擇一個節省CPU耗時的壓縮方式。能夠採用snappy
壓縮算法,該算法的壓縮和解壓效率都很是高。
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
複製代碼
最終的結果數據(Reducer輸出數據)也是能夠進行壓縮的,能夠選擇一個壓縮效果比較好的,能夠減小數據的大小和數據的磁盤讀寫時間; 注:經常使用的gzip,snappy壓縮算法是不支持並行處理的,若是數據源是gzip/snappy壓縮文件大文件,這樣只會有有個mapper來處理這個文件,會嚴重影響查詢效率。 因此若是結果數據須要做爲其餘查詢任務的數據源,能夠選擇支持splitable的LZO
算法,這樣既能對結果文件進行壓縮,還能夠並行的處理,這樣就能夠大大的提升job執行的速度了。關於如何給Hadoop集羣安裝LZO壓縮庫能夠查看這篇文章。
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
複製代碼
Hadoop集羣支持一下算法:
Hive 從 HDFS 中讀取數據,有兩種方式:啓用 MapReduce 讀取、直接抓取。
直接抓取數據比 MapReduce 方式讀取數據要快的多,可是隻有少數操做可使用直接抓取方式。
能夠經過hive.fetch.task.conversion
參數來配置在什麼狀況下采用直接抓取方式:
select *
、在分區字段上 where
過濾、有 limit
這三種場景下才啓用直接抓取方式。select
、where
篩選、limit
時,都啓用直接抓取方式。set hive.fetch.task.conversion=more; -- 啓用fetch more模式
複製代碼
Hive 在集羣上查詢時,默認是在集羣上多臺機器上運行,須要多個機器進行協調運行,這種方式很好的解決了大數據量的查詢問題。可是在Hive查詢處理的數據量比較小的時候,其實沒有必要啓動分佈式模式去執行,由於以分佈式方式執行設計到跨網絡傳輸、多節點協調等,而且消耗資源。對於小數據集,能夠經過本地模式,在單臺機器上處理全部任務,執行時間明顯被縮短。
set hive.exec.mode.local.auto=true; -- 打開hive自動判斷是否啓動本地模式的開關
set hive.exec.mode.local.auto.input.files.max=4; -- map任務數最大值
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map輸入文件最大大小
複製代碼
Hive 語句最終會轉換爲一系列的 MapReduce 任務,每個MapReduce 任務是由一系列的Map Task 和 Reduce Task 組成的,默認狀況下,MapReduce 中一個 Map Task 或者 Reduce Task 就會啓動一個 JVM 進程,一個 Task 執行完畢後,JVM進程就會退出。這樣若是任務花費時間很短,又要屢次啓動 JVM 的狀況下,JVM的啓動時間會變成一個比較大的消耗,這時,能夠經過重用 JVM 來解決。
set mapred.job.reuse.jvm.num.tasks=5;
複製代碼
JVM也是有缺點的,開啓JVM重用會一直佔用使用到的 task 的插槽,以便進行重用,直到任務完成後纔會釋放。若是某個
不平衡的job
中有幾個 reduce task 執行的時間要比其餘的 reduce task 消耗的時間要多得多的話,那麼保留的插槽就會一直空閒卻沒法被其餘的 job 使用,直到全部的 task 都結束了纔會釋放。
有的查詢語句,hive會將其轉化爲一個或多個階段,包括:MapReduce 階段、抽樣階段、合併階段、limit 階段等。默認狀況下,一次只執行一個階段。可是,若是某些階段不是互相依賴,是能夠並行執行的。多階段並行是比較耗系統資源的。
set hive.exec.parallel=true; -- 能夠開啓併發執行。
set hive.exec.parallel.thread.number=16; -- 同一個sql容許最大並行度,默認爲8。
複製代碼
在分佈式集羣環境下,由於程序Bug(包括Hadoop自己的bug),負載不均衡或者資源分佈不均等緣由,會形成同一個做業的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢於其餘任務(好比一個做業的某個任務進度只有50%,而其餘全部任務已經運行完畢),則這些任務會拖慢做業的總體執行進度。爲了不這種狀況發生,Hadoop採用了推測執行(Speculative Execution)機制,它根據必定的法則推測出「拖後腿」的任務,併爲這樣的任務啓動一個備份任務,讓該任務與原始任務同時處理同一份數據,並最終選用最早成功運行完成任務的計算結果做爲最終結果。
set mapreduce.map.speculative=true;
set mapreduce.reduce.speculative=true;
複製代碼
建議:
若是用戶對於運行時的誤差很是敏感的話,那麼能夠將這些功能關閉掉。若是用戶由於輸入數據量很大而須要執行長時間的map或者Reduce task的話,那麼啓動推測執行形成的浪費是很是巨大大。
擴展閱讀