【SQL系列】深刻淺出數據倉庫中SQL性能優化之Hive篇

公衆號: SAP Technical
本文做者: matinal
 

 

前言部分

你們能夠關注個人公衆號,公衆號裏的排版更好,閱讀更溫馨。html

正文部分

一個Hive查詢生成多個Map Reduce Job,一個Map Reduce Job又有Map,Reduce,Spill,Shuffle,Sort等多個階段,因此針對Hive查詢的優化能夠大體分爲針對MR中單個步驟的優化(其中又會有細分),針對MR全局的優化,和針對整個查詢(多MR Job)的優化,下文會分別闡述。算法

 

在開始以前,先把MR的流程圖帖出來(摘自Hadoop權威指南),方便後面對照。另外要說明的是,這個優化只是針對Hive 0.9版本,而不是後來Hortonwork發起Stinger項目以後的版本。相對應的Hadoop版本是1.x而非2.x。sql

 

Map階段的優化(Map phase)

Map階段的優化,主要是肯定合適的Map數。那麼首先要了解Map數的計算公式:數據庫

 

[js]  view plain copy
 
 
 
  1. num_Map_tasks = max[${Mapred.min.split.size},  
  2.                 min(${dfs.block.size}, ${Mapred.max.split.size})]  

 

 

  • Mapred.min.split.size指的是數據的最小分割單元大小。
  • Mapred.max.split.size指的是數據的最大分割單元大小。
  • dfs.block.size指的是HDFS設置的數據塊大小。

 

通常來講dfs.block.size這個值是一個已經指定好的值,並且這個參數Hive是識別不到的: apache

 

[js]  view plain copy
 
 
 
  1. Hive> set dfs.block.size;  
  2. dfs.block.size is undefined  

 

因此實際上只有Mapred.min.split.size和Mapred.max.split.size這兩個參數(本節內容後面就以min和max指代這兩個參數)來決定Map數量。在Hive中min的默認值是1B,max的默認值是256MB: 性能優化

 

[js]  view plain copy
 
 
 
  1. Hive> set Mapred.min.split。size;  
  2. Mapred.min.split.size=1  
  3. Hive> set Mapred.max.split。size;  
  4. Mapred.max.split.size=256000000  

 

因此若是不作修改的話,就是1個Map task處理256MB數據,咱們就以調整max爲主。經過調整max能夠起到調整Map數的做用,減少max能夠增長Map數,增大max能夠減小Map數。須要提醒的是,直接調整Mapred.Map.tasks這個參數是沒有效果的。架構

調整大小的時機根據查詢的不一樣而不一樣,總的來說能夠經過觀察Map task的完成時間來肯定是否須要增長Map資源。若是Map task的完成時間都是接近1分鐘,甚至幾分鐘了,那麼每每增長Map數量,使得每一個Map task處理的數據量減小,可以讓Map task更快完成;而若是Map task的運行時間已經不多了,好比10-20秒,這個時候增長Map不太可能讓Map task更快完成,反而可能由於Map須要的初始化時間反而讓Job整體速度變慢,這個時候反而須要考慮是否能夠把Map的數量減小,這樣能夠節省更多資源給其餘Job。app

Reduce階段的優化(Reduce phase)

這裏說的Reduce階段,是指前面流程圖中的Reduce phase(實際的Reduce計算)而非圖中整個Reduce task。Reduce階段優化的主要工做也是選擇合適的Reduce task數量,跟上面的Map優化相似。jvm

與Map優化不一樣的是,Reduce優化時,能夠直接設置Mapred。Reduce。tasks參數從而直接指定Reduce的個數。固然直接指定Reduce個數雖然比較方便,可是不利於自動擴展。Reduce數的設置雖然相較Map更靈活,可是也能夠像Map同樣設定一個自動生成規則,這樣運行定時Job的時候就不用擔憂原來設置的固定Reduce數會因爲數據量的變化而不合適。分佈式

Hive估算Reduce數量的時候,使用的是下面的公式:

 

[js]  view plain copy
 
 
 
  1. num_Reduce_tasks = min[${Hive.exec.Reducers.max},   
  2.                       (${input.size} / ${ Hive.exec.Reducers.bytes.per.Reducer})]  

 

也就是說,根據輸入的數據量大小來決定Reduce的個數,默認Hive.exec.Reducers.bytes.per.Reducer爲1G,並且Reduce個數不能超過一個上限參數值,這個參數的默認取值爲999。因此咱們能夠調整Hive.exec.Reducers.bytes.per.Reducer來設置Reduce個數。

設置Reduce數一樣也是根據運行時間做爲參考調整,而且能夠根據特定的業務需求、工做負載類型總結出經驗,因此再也不贅述。

Map與Reduce之間的優化(Spill, copy, Sort phase)

Map phase和Reduce phase之間主要有3道工序。首先要把Map輸出的結果進行排序後作成中間文件,其次這個中間文件就能分發到各個Reduce,最後Reduce端在執行Reduce phase以前把收集到的排序子文件合併成一個排序文件。這個部分能夠調的參數挺多,可是通常都是不要調整的,沒必要重點關注。

Spill 與 Sort

在Spill階段,因爲內存不夠,數據可能沒辦法在內存中一次性排序完成,那麼就只能把局部排序的文件先保存到磁盤上,這個動做叫Spill,而後Spill出來的多個文件能夠在最後進行merge。若是發生Spill,能夠經過設置io.Sort.mb來增大Mapper輸出buffer的大小,避免Spill的發生。另外合併時能夠經過設置io.Sort.factor來使得一次性可以合併更多的數據。調試參數的時候,一個要看Spill的時間成本,一個要看merge的時間成本,還須要注意不要撐爆內存(io.Sort.mb是算在Map的內存裏面的)。Reduce端的merge也是同樣能夠用io.Sort.factor。通常狀況下這兩個參數不多須要調整,除非很明確知道這個地方是瓶頸。

Copy

copy階段是把文件從Map端copy到Reduce端。默認狀況下在5%的Map完成的狀況下Reduce就開始啓動copy,這個有時候是很浪費資源的,由於Reduce一旦啓動就被佔用,一直等到Map所有完成,收集到全部數據才能夠進行後面的動做,因此咱們能夠等比較多的Map完成以後再啓動Reduce流程,這個比例能夠通Mapred.Reduce.slowstart.completed.Maps去調整,他的默認值就是5%。若是以爲這麼作會減慢Reduce端copy的進度,能夠把copy過程的線程增大。tasktracker.http.threads能夠決定做爲server端的Map用於提供數據傳輸服務的線程,Mapred.Reduce.parallel.copies能夠決定做爲client端的Reduce同時從Map端拉取數據的並行度(一次同時從多少個Map拉數據),修改參數的時候這兩個注意協調一下,server端能處理client端的請求便可。

文件格式的優化

文件格式方面有兩個問題,一個是給輸入和輸出選擇合適的文件格式,另外一個則是小文件問題。小文件問題在目前的Hive環境下已經獲得了比較好的解決,Hive的默認配置中就能夠在小文件輸入時自動把多個文件合併給1個Map處理,輸出時若是文件很小也會進行一輪單獨的合併,因此這裏就不專門討論了。相關的參數能夠在這裏找到。

關於文件格式,Hive0.9版本有3種,textfile,sequencefile和rcfile。整體上來講,rcfile的壓縮比例和查詢時間稍好一點,因此推薦使用。

關於使用方法,能夠在建表結構時能夠指定格式,而後指定壓縮插入:

 

[js]  view plain copy
 
 
 
  1. create table rc_file_test( col int ) stored as rcfile;  
  2. set Hive.exec.compress.output = true;  
  3. insert overwrite table rc_file_test  
  4. select * from source_table;  

 

另外時也能夠指定輸出格式,也能夠經過Hive。default。fileformat來設定輸出格式,適用於create table as select的狀況: 

 

[js]  view plain copy
 
 
 
  1. set Hive.default.fileformat = SequenceFile;  
  2. set Hive.exec.compress.output = true;   
  3. /*對於sequencefile,有record和block兩種壓縮方式可選,block壓縮比更高*/  
  4. set Mapred.output.compression.type = BLOCK;   
  5. create table seq_file_test  
  6. as select * from source_table;  

 

上面的文件格式轉換,實際上是由Hive完成的(也就是插入動做)。可是也能夠由外部直接導入純文本(能夠按照這裏的作法預先壓縮),或者是由MapReduce Job生成的數據。

值得注意的是,Hive讀取sequencefile的時候,是把key忽略的,也就是直接讀value而且按照指定分隔符分隔字段。可是若是Hive的數據來源是從mr生成的,那麼寫sequencefile的時候,key和value都是有意義的,key不能被忽略,而是應該當成第一個字段。爲了解決這種不匹配的狀況,有兩種辦法。一種是要求凡是結果會給Hive用的mr Job輸出value的時候帶上key。可是這樣的話對於開發是一個負擔,讀寫數據的時候都要注意這個狀況。因此更好的方法是第二種,也就是把這個源自於Hive的問題交給Hive解決,寫一個InputFormat包裝一下,把value輸出加上key便可。如下是核心代碼,修改了RecordReader的next方法:

 

[js]  view plain copy
 
 
 
  1. public synchronized boolean next(K key, V value) throws IOException   
  2. {  
  3.     Text tKey = (Text) key;  
  4.     Text tValue = (Text) value;  
  5.     if (!super.next(innerKey, innerValue))   
  6.         return false;  
  7.   
  8.     Text inner_key = (Text) innerKey; //在構造函數中用createKey()生成  
  9.     Text inner_value = (Text) innerValue; //在構造函數中用createValue()生成  
  10.   
  11.     tKey.set(inner_key);  
  12.     tValue.set(inner_key.toString() + '\t' + inner_value.toString()); // 分隔符注意本身定義  
  13.     return true;  
  14. }  

 

Job總體優化

有一些問題必須從Job的總體角度去觀察。這裏討論幾個問題:Job執行模式(本地執行v.s.分佈式執行)、JVM重用、索引、Join算法、數據傾斜。

Job執行模式

Hadoop的Map Reduce Job能夠有3種模式執行,即本地模式,僞分佈式,還有真正的分佈式。本地模式和僞分佈式都是在最初學習Hadoop的時候每每被說成是作單機開發的時候用到。可是實際上對於處理數據量很是小的Job,直接啓動分佈式Job會消耗大量資源,而真正執行計算的時間反而很是少。這個時候就應該使用本地模式執行mr Job,這樣執行的時候不會啓動分佈式Job,執行速度就會快不少。好比通常來講啓動分佈式Job,不管多小的數據量,執行時間通常不會少於20s,而使用本地mr模式,10秒左右就能出結果。

設置執行模式的主要參數有三個,一個是Hive.exec.mode.local.auto,把他設爲true就可以自動開啓local mr模式。可是這還不足以啓動local mr,輸入的文件數量和數據量大小必需要控制,這兩個參數分別爲Hive.exec.mode.local.auto.tasks.max和Hive.exec.mode.local.auto.inputbytes.max,默認值分別爲4和128MB,即默認狀況下,Map處理的文件數不超過4個而且總大小小於128MB就啓用local mr模式。

JVM重用

正常狀況下,MapReduce啓動的JVM在完成一個task以後就退出了,可是若是任務花費時間很短,又要屢次啓動JVM的狀況下(好比對很大數據量進行計數操做),JVM的啓動時間就會變成一個比較大的overhead。在這種狀況下,可使用jvm重用的參數:

 

[js]  view plain copy
 
 
 
  1. set Mapred.Job.reuse.jvm.num.tasks = 5;  

 

他的做用是讓一個jvm運行屢次任務以後再退出。這樣一來也能節約很多JVM啓動時間。

索引

整體上來講,Hive的索引目前仍是一個不太適合使用的東西,這裏只是考慮到敘述完整性,對其進行基本的介紹。

Hive中的索引架構開放了一個接口,容許你根據這個接口去實現本身的索引。目前Hive本身有一個參考的索引實現(CompactIndex),後來在0.8版本中又加入位圖索引。這裏就講講CompactIndex。

CompactIndex的實現原理相似一個lookup table,而非傳統數據庫中的B樹。若是你對table A的col1作了索引,索引文件自己就是一個table,這個table會有3列,分別是col1的枚舉值,每一個值對應的數據文件位置,以及在這個文件位置中的偏移量。經過這種方式,能夠減小你查詢的數據量(偏移量能夠告訴你從哪一個位置開始找,天然只須要定位到相應的block),起到減小資源消耗的做用。可是就其性能來講,並無很大的改善,極可能還不如構建索引須要花的時間。因此在集羣資源充足的狀況下,沒有太大必要考慮索引。

CompactIndex的還有一個缺點就是使用起來不友好,索引建完以後,使用以前還須要根據查詢條件作一個一樣剪裁才能使用,索引的內部結構徹底暴露,並且還要花費額外的時間。具體看看下面的使用方法就瞭解了:

 

[js]  view plain copy
 
 
 
  1. /*在index_test_table表的id字段上建立索引*/  
  2. create index idx on table index_test_table(id)    
  3. as 'org.apache.Hadoop.Hive.ql.index.compact.CompactIndexHandler' with deferred rebuild;  
  4. alter index idx on index_test_table rebuild;  
  5.       
  6. /*索引的剪裁。找到上面建的索引表,根據你最終要用的查詢條件剪裁一下。*/  
  7. /*若是你想跟RDBMS同樣建完索引就用,那是不行的,會直接報錯,這也是其麻煩的地方*/  
  8. create table my_index  
  9. as select _bucketname, `_offsets`  
  10. from default__index_test_table_idx__ where id = 10;  
  11.       
  12. /*如今能夠用索引了,注意最終查詢條件跟上面的剪裁條件一致*/  
  13. set Hive.index.compact.file = /user/Hive/warehouse/my_index;   
  14. set Hive.input.format = org.apache.Hadoop.Hive.ql.index.compact.HiveCompactIndexInputFormat;  
  15. select count(*) from index_test_table where id = 10;  

 

Join算法

處理分佈式join,通常有兩種方法:

 

  • replication join:把其中一個表複製到全部節點,這樣另外一個表在每一個節點上面的分片就能夠跟這個完整的表join了;
  • repartition join:把兩份數據按照join key進行hash重分佈,讓每一個節點處理hash值相同的join key數據,也就是作局部的join。

 

這兩種方式在M/R Job中分別對應了Map side join和Reduce side join。在一些MPP DB中,數據能夠按照某列字段預先進行hash分佈,這樣在跟這個表以這個字段爲join key進行join的時候,該表確定不須要作數據重分佈了,這種功能是以HDFS做爲底層文件系統的Hive所沒有的。

在默認狀況下,Hive的join策略是進行Reduce side join。當兩個表中有一個是小表的時候,就能夠考慮用Map join了,由於小表複製的代價會好過大表Shuffle的代價。使用Map join的配置方法有兩種,一種直接在sql中寫hint,語法是/*+MapJOIN (tbl)*/,其中tbl就是你想要作replication的表。另外一種方法是設置Hive.auto.convert.join = true,這樣Hive會自動判斷當前的join操做是否合適作Map join,主要是找join的兩個表中有沒有小表。至於多大的表算小表,則是由Hive.smalltable.filesize決定,默認25MB。

可是有的時候,沒有一個表足夠小到可以放進內存,可是仍是想用Map join怎麼辦?這個時候就要用到bucket Map join。其方法是兩個join表在join key上都作hash bucket,而且把你打算複製的那個(相對)小表的bucket數設置爲大表的倍數。這樣數據就會按照join key作hash bucket。小表依然複製到全部節點,Map join的時候,小表的每一組bucket加載成hashtable,與對應的一個大表bucket作局部join,這樣每次只須要加載部分hashtable就能夠了。

而後在兩個表的join key都具備惟一性的時候(也就是可作主鍵),還能夠進一步作Sort merge bucket Map join。作法仍是兩邊要作hash bucket,並且每一個bucket內部要進行排序。這樣一來當兩邊bucket要作局部join的時候,只須要用相似merge Sort算法中的merge操做同樣把兩個bucket順序遍歷一遍便可完成,這樣甚至都不用把一個bucket完整的加載成hashtable,這對性能的提高會有很大幫助。

而後這裏以一個完整的實驗說明這幾種join算法如何操做。

首先建表要帶上bucket:

 

[js]  view plain copy
 
 
 
  1. create table Map_join_test(id int)  
  2. clustered by (id) Sorted by (id) into 32 buckets  
  3. stored as textfile;  

 

而後插入咱們準備好的800萬行數據,注意要強制劃分紅bucket(也就是用Reduce劃分hash值相同的數據到相同的文件): 

 

[js]  view plain copy
 
 
 
  1. set Hive.enforce.bucketing = true;  
  2. insert overwrite table Map_join_test  
  3. select * from Map_join_source_data;  

 

這樣這個表就有了800萬id值(且裏面沒有重複值,因此能夠作Sort merge),佔用80MB左右。

接下來咱們就能夠一一嘗試Map join的算法了。首先是普通的Map join:

 

[js]  view plain copy
 
 
 
  1. select /*+Mapjoin(a) */count(*)  
  2. from Map_join_test a  
  3. join Map_join_test b on a.id = b.id;  

 

而後就會看到分發hash table的過程: 

 

[js]  view plain copy
 
 
 
  1. 2013-08-31 09:08:43     Starting to launch local task to process Map join;      maximum memory = 1004929024  
  2. 2013-08-31 09:08:45     Processing rows:   200000  Hashtable size: 199999  Memory usage:   38823016        rate:   0.039  
  3. 2013-08-31 09:08:46     Processing rows:   300000  Hashtable size: 299999  Memory usage:   56166968        rate:   0.056  
  4. ……  
  5. 2013-08-31 09:12:39     Processing rows:  4900000 Hashtable size: 4899999 Memory usage:   896968104       rate:   0.893  
  6. 2013-08-31 09:12:47     Processing rows:  5000000 Hashtable size: 4999999 Memory usage:   922733048       rate:   0.918  
  7. Execution failed with exit status: 2  
  8. Obtaining error information  
  9.   
  10. Task failed!  
  11. Task ID:  
  12.   Stage-4  

 

不幸的是,竟然內存不夠了,直接作Map join失敗了。可是80MB的大小爲什麼用1G的heap size都放不下?觀察整個過程就會發現,平均一條記錄須要用到200字節的存儲空間,這個overhead太大了,對於Map join的小表size必定要好好評估,若是有幾十萬記錄數就要當心了。雖然不太清楚其中的構造原理,可是在互聯網上也能找到其餘的例證,好比這裏和這裏,平均一行500字節左右。這個明顯比通常的表一行佔用的數據量要大。不過Hive也在作這方面的改進,爭取縮小hash table,好比Hive-6430。

因此接下來咱們就用bucket Map join,以前分的bucket就派上用處了。只須要在上述sql的前面加上以下的設置:

 

[js]  view plain copy
 
 
 
  1. set Hive。optimize。bucketMapjoin = true;  

 

而後仍是會看到hash table分發: 

 

[js]  view plain copy
 
 
 
  1. 2013-08-31 09:20:39     Starting to launch local task to process Map join;      maximum memory = 1004929024  
  2. 2013-08-31 09:20:41     Processing rows:   200000  Hashtable size: 199999  Memory usage:   38844832        rate:   0.039  
  3. 2013-08-31 09:20:42     Processing rows:   275567  Hashtable size: 275567  Memory usage:   51873632        rate:   0.052  
  4. 2013-08-31 09:20:42     Dump the hashtable into file: file:/tmp/Hadoop/Hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0。hashtable  
  5. 2013-08-31 09:20:46     Upload 1 File to: file:/tmp/Hadoop/Hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0。hashtable File size: 11022975  
  6. 2013-08-31 09:20:47     Processing rows:   300000  Hashtable size: 24432   Memory usage:   8470976 rate:   0.008  
  7. 2013-08-31 09:20:47     Processing rows:   400000  Hashtable size: 124432  Memory usage:   25368080        rate:   0.025  
  8. 2013-08-31 09:20:48     Processing rows:   500000  Hashtable size: 224432  Memory usage:   42968080        rate:   0.043  
  9. 2013-08-31 09:20:49     Processing rows:   551527  Hashtable size: 275960  Memory usage:   52022488        rate:   0.052  
  10. 2013-08-31 09:20:49     Dump the hashtable into file: file:/tmp/Hadoop/Hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000001_0。hashtable  
  11. ……  

 

此次就會看到每次構建完一個hash table(也就是所對應的對應一個bucket),會把這個hash table寫入文件,從新構建新的hash table。這樣一來因爲每一個hash table的量比較小,也就不會有內存不足的問題,整個sql也能成功運行。不過光光是這個複製動做就要花去3分半的時間,因此若是整個Job原本就花不了多少時間的,那這個時間就不可小視。    

最後咱們試試Sort merge bucket Map join,在bucket Map join的基礎上加上下面的設置便可:

 

[js]  view plain copy
 
 
 
  1. set Hive.optimize.bucketMapjoin.Sortedmerge = true;  
  2. set Hive.input.format = org.apache.Hadoop.Hive.ql.io.BucketizedHiveInputFormat;  

 

Sort merge bucket Map join是不會產生hash table複製的步驟的,直接開始作實際Map端join操做了,數據在join的時候邊作邊讀。跳過複製的步驟,外加join算法的改進,使得Sort merge bucket Map join的效率要明顯好於bucket Map join。

關於join的算法雖然有這麼些選擇,可是我的以爲,對於平常使用,掌握默認的Reduce join和普通的(無bucket)Map join已經能解決大多數問題。若是小表不能徹底放內存,可是小表相對大表的size量級差異也很是大的時候也能夠試試bucket Map join,不過其hash table分發的過程會浪費很多時間,須要評估下是否可以比Reduce join更高效。而Sort merge bucket Map join雖然性能不錯,可是把數據作成bucket自己也須要時間,另外其發動條件比較特殊,就是兩邊join key必須都惟一(不少介紹資料中都不提這一點。強調下必須都是惟一,哪怕只有一個表不惟一,出來的結果也是錯的。固然,其實這點徹底能夠根據其算法原理推敲出來)。這樣的場景相對比較少見,「用戶基本表 join 用戶擴展表」以及「用戶今天的數據快照 join 用戶昨天的數據快照」這類場景可能比較合適。

這裏順便說個題外話,在數據倉庫中,小表每每是維度表,而小表Map join這件事情其實用udf代替還會更快,由於不用單獨啓動一輪Job,因此這也是一種可選方案。固然前提條件是維度表是固定的天然屬性(好比日期),只增長不修改(好比網站的頁面編號)的狀況也能夠考慮。若是維度有更新,要作緩慢變化維的,固然仍是維表好維護。至於維表本來的一個主要用途OLAP,以Hive目前的性能是無法實現的,也就不須要多慮了。

數據傾斜

所謂數據傾斜,說的是因爲數據分佈不均勻,個別值集中佔據大部分數據量,加上Hadoop的計算模式,致使計算資源不均勻引發性能降低。下圖就是一個例子:

 

仍是拿網站的訪問日誌說事吧。假設網站訪問日誌中會記錄用戶的user_id,而且對於註冊用戶使用其用戶表的user_id,對於非註冊用戶使用一個user_id=0表明。那麼鑑於大多數用戶是非註冊用戶(只看不寫),因此user_id=0佔據了絕大多數。而若是進行計算的時候若是以user_id做爲group by的維度或者是join key,那麼個別Reduce會收到比其餘Reduce多得多的數據——由於它要接收全部user_id=0的記錄進行處理,使得其處理效果會很是差,其餘Reduce都跑完好久了它還在運行。

傾斜分紅group by形成的傾斜和join形成的傾斜,須要分開看。

group by形成的傾斜有兩個參數能夠解決,一個是Hive.Map.aggr,默認值已經爲true,意思是會作Map端的combiner。因此若是你的group by查詢只是作count(*)的話,實際上是看不出傾斜效果的,可是若是你作的是count(distinct),那麼仍是會看出一點傾斜效果。另外一個參數是Hive.groupby. skewindata。這個參數的意思是作Reduce操做的時候,拿到的key並非全部相同值給同一個Reduce,而是隨機分發,而後Reduce作聚合,作完以後再作一輪MR,拿前面聚合過的數據再算結果。因此這個參數其實跟Hive.Map.aggr作的是相似的事情,只是拿到Reduce端來作,並且要額外啓動一輪Job,因此其實不怎麼推薦用,效果不明顯。

若是說要改寫SQL來優化的話,能夠按照下面這麼作:

 

[js]  view plain copy
 
 
 
  1. /*改寫前*/  
  2. select a, count(distinct b) as c from tbl group by a;  
  3. /*改寫後*/  
  4. select a, count(*) as c  
  5. from (select distinct a, b from tbl) group by a;  

 

join形成的傾斜,就好比上面描述的網站訪問日誌和用戶表兩個表join: 

 

[js]  view plain copy
 
 
 
  1. select a.* from logs a join users b on a。user_id = b.user_id;  

 

Hive給出的解決方案叫skew join,其原理把這種user_id = 0的特殊值先不在Reduce端計算掉,而是先寫入hdfs,而後啓動一輪Map join專門作這個特殊值的計算,指望能提升計算這部分值的處理速度。固然你要告訴Hive這個join是個skew join,即:

 

[js]  view plain copy
 
 
 
  1. set Hive.optimize.skewjoin = true;  

 

還有要告訴Hive如何判斷特殊值,根據Hive.skewjoin.key設置的數量Hive能夠知道,好比默認值是100000,那麼超過100000條記錄的值就是特殊值。

skew join的流程能夠用下圖描述:

 

另外對於特殊值的處理每每跟業務有關係,因此也能夠從業務角度重寫sql解決。好比前面這種傾斜join,能夠把特殊值隔離開來(從業務角度說,users表應該不存在user_id = 0的狀況,可是這裏仍是假設有這個值,使得這個寫法更加具備通用性): 

 

[js]  view plain copy
 
 
 
  1. select a.* from   
  2. (  
  3. select a.*  
  4. from (select * from logs where user_id = 0)  a   
  5. join (select * from users where user_id = 0) b   
  6. on a。user_id =  b。user_id  
  7. union all  
  8. select a.*   
  9. from logs a join users b  
  10. on a。user_id <> 0 and a。user_id = b.user_id  
  11. )t;  

 

數據傾斜不只僅是Hive的問題,實際上是share nothing架構下必然會碰到的數據分佈問題,對此學界也有專門的研究,好比skewtune。

SQL總體優化

前面對於單個Job如何作優化已經作過詳細討論,可是Hive查詢會生成多個Job,針對多個Job,有什麼地方須要優化?

Job間並行

首先,在Hive生成的多個Job中,在有些狀況下Job之間是能夠並行的,典型的就是子查詢。當須要執行多個子查詢union all或者join操做的時候,Job間並行就可使用了。好比下面的代碼就是一個能夠並行的場景示意:

 

[js]  view plain copy
 
 
 
  1. select * from   
  2. (  
  3.    select count(*) from logs   
  4.    where log_date = 20130801 and item_id = 1  
  5.    union all   
  6.    select count(*) from logs   
  7.    where log_date = 20130802 and item_id = 2  
  8.    union all   
  9.    select count(*) from logs   
  10.    where log_date = 20130803 and item_id = 3  
  11. )t  

 

設置Job間並行的參數是Hive.exec.parallel,將其設爲true便可。默認的並行度爲8,也就是最多容許sql中8個Job並行。若是想要更高的並行度,能夠經過Hive.exec.parallel. thread.number參數進行設置,但要避免設置過大而佔用過多資源。

減小Job數

另外在實際開發過程當中也發現,一些實現思路會致使生成多餘的Job而顯得不夠高效。好比這個需求:查詢某網站日誌中訪問過頁面a和頁面b的用戶數量。低效的思路是面向明細的,先取出看過頁面a的用戶,再取出看過頁面b的用戶,而後取交集,代碼以下:

 

[js]  view plain copy
 
 
 
  1. select count(*)   
  2. from   
  3. (select distinct user_id   
  4. from logs where page_name = ‘a’) a  
  5. join   
  6. (select distinct user_id   
  7. from logs where blog_owner = ‘b’) b   
  8. on a.user_id = b.user_id;  

 

這樣一來,就要產生2個求子查詢的Job,一個用於關聯的Job,還有一個計數的Job,一共有4個Job。

可是咱們直接用面向統計的方法去計算的話(也就是用group by替代join),則會更加符合M/R的模式,並且生成了一個徹底不帶子查詢的sql,只須要用一個Job就能跑完:

 

[js]  view plain copy
 
 
 
  1. select count(*)   
  2. from logs group by user_id  
  3. having (count(case when page_name = ‘a’ then 1 end) > 0  
  4.     and count(case when page_name = ‘b’ then 1 end) > 0)  

 

第一種查詢方法符合思考問題的直覺,是工程師和分析師在實際查數據中最早想到的寫法,可是若是在目前Hive的query planner不是那麼智能的狀況下,想要更加快速的跑出結果,懂一點工具的內部機理也是必須的。(做者:孫逸 / 審校:劉亞瓊)

相關文章
相關標籤/搜索