2018-12-27 10:00:12數據庫
Hive是基於Hadoop平臺的,它提供了相似SQL同樣的查詢語言HQL。有了Hive,若是使用過SQL語言,而且不理解Hadoop MapReduce運行原理,也就沒法經過編程來實現MR,可是你仍然能夠很容易地編寫出特定查詢分析的HQL語句,經過使用相似SQL的語法,將HQL查詢語句提交Hive系統執行查詢分析,最終Hive會幫你轉換成底層Hadoop可以理解的MR Job。
對於最基本的HQL查詢咱們再也不累述,這裏主要說明Hive中進行統計分析時使用到的JOIN操做。在說明Hive JOIN以前,咱們先簡單說明一下,Hadoop執行MR Job的基本過程(運行機制),能更好的幫助咱們理解HQL轉換到底層的MR Job後是如何執行的。咱們重點說明MapReduce執行過程當中,從Map端到Reduce端這個過程(Shuffle)的執行狀況,如圖所示(來自《Hadoop: The Definitive Guide》):
基本執行過程,描述以下:apache
- 一個InputSplit輸入到map,會運行咱們實現的Mapper的處理邏輯,對數據進行映射操做。
- map輸出時,會首先將輸出中間結果寫入到map自帶的buffer中(buffer默認大小爲100M,能夠經過io.sort.mb配置)。
- map自帶的buffer使用容量達到必定門限(默認0.80或80%,能夠經過io.sort.spill.percent配置),一個後臺線程會準備將buffer中的數據寫入到磁盤。
- 這個後臺線程在將buffer中數據寫入磁盤以前,會首先將buffer中的數據進行partition(分區,partition數爲Reducer的個數),對於每一個的數據會基於Key進行一個in-memory排序。
- 排序後,會檢查是否配置了Combiner,若是配置了則直接做用到已排序的每一個partition的數據上,對map輸出進行化簡壓縮(這樣寫入磁盤的數據量就會減小,下降I/O操做開銷)。
- 如今能夠將通過處理的buffer中的數據寫入磁盤,生成一個文件(每次buffer容量達到設置的門限,都會對應着一個寫入到磁盤的文件)。
- map任務結束以前,會對輸出的多個文件進行合併操做,合併成一個文件(若map輸出至少3個文件,在多個文件合併後寫入以前,若是配置了Combiner,則會運行來化簡壓縮輸出的數據,文件個數能夠經過min.num.splits.for.combine配置;若是指定了壓縮map輸出,這裏會根據配置對數據進行壓縮寫入磁盤),這個文件仍然保持partition和排序的狀態。
- reduce階段,每一個reduce任務開始從多個map上拷貝屬於本身partition(map階段已經作好partition,並且每一個reduce任務知道應該拷貝哪一個partition;拷貝過程是在不一樣節點之間,Reducer上拷貝線程基於HTTP來經過網絡傳輸數據)。
- 每一個reduce任務拷貝的map任務結果的指定partition,也是先將數據放入到自帶的一個buffer中(buffer默認大小爲Heap內存的70%,能夠經過mapred.job.shuffle.input.buffer.percent配置),若是配置了map結果進行壓縮,則這時要先將數據解壓縮後放入buffer中。
- reduce自帶的buffer使用容量達到必定門限(默認0.66或66%,能夠經過mapred.job.shuffle.merge.percent配置),或者buffer中存放的map的輸出的數量達到必定門限(默認1000,能夠經過mapred.inmem.merge.threshold配置),buffer中的數據將會被寫入到磁盤中。
- 在將buffer中多個map輸出合併寫入磁盤以前,若是設置了Combiner,則會化簡壓縮合並的map輸出。
- 當屬於該reducer的map輸出所有拷貝完成,則會在reducer上生成多個文件,這時開始執行合併操做,並保持每一個map輸出數據中Key的有序性,將多個文件合併成一個文件(在reduce端可能存在buffer和磁盤上都有數據的狀況,這樣在buffer中的數據能夠減小必定量的I/O寫入操做開銷)。
- 最後,執行reduce階段,運行咱們實現的Reducer中化簡邏輯,最終將結果直接輸出到HDFS中(由於Reducer運行在DataNode上,輸出結果的第一個replica直接在存儲在本地節點上)。
經過上面的描述咱們看到,在MR執行過程當中,存在Shuffle過程的MR須要在網絡中的節點之間(Mapper節點和Reducer節點)拷貝數據,若是傳輸的數據量很大會形成必定的網絡開銷。並且,Map端和Reduce端都會經過一個特定的buffer來在內存中臨時緩存數據,若是沒法根據實際應用場景中數據的規模來使用Hive,尤爲是執行表的JOIN操做,有可能很浪費資源,下降了系統處理任務的效率,還可能由於內存不足形成OOME問題,致使計算任務失敗。
下面,咱們說明Hive中的JOIN操做,針對不一樣的JOIN方式,應該如何來實現和優化:編程
生成一個MR Job緩存
多表鏈接,若是多個表中每一個表都使用同一個列進行鏈接(出如今JOIN子句中),則只會生成一個MR Job,例如:網絡
1 |
SELECT a.val, b.val, c.val FROM a JOIN b ON (a. key = b.key1) JOIN c ON (c. key = b.key1) |
三個表a、b、c都分別使用了同一個字段進行鏈接,亦即同一個字段同時出如今兩個JOIN子句中,從而只生成一個MR Job。app
生成多個MR Jobide
多表鏈接,若是多表中,其中存在一個表使用了至少2個字段進行鏈接(同一個表的至少2個列出如今JOIN子句中),則會至少生成2個MR Job,例如:oop
1 |
SELECT a.val, b.val, c.val FROM a JOIN b ON (a. key = b.key1) JOIN c ON (c. key = b.key2) |
三個表基於2個字段進行鏈接,這兩個字段b.key1和b.key2同時出如今b表中。鏈接的過程是這樣的:首先a和b表基於a.key和b.key1進行鏈接,對應着第一個MR Job;表a和b鏈接的結果,再和c進行鏈接,對應着第二個MR Job。優化
錶鏈接順序優化ui
多表鏈接,會轉換成多個MR Job,每個MR Job在Hive中稱爲JOIN階段(Stage)。在每個Stage,按照JOIN順序中的最後一個表應該儘可能是大表,由於JOIN前一階段生成的數據會存在於Reducer的buffer中,經過stream最後面的表,直接從Reducer的buffer中讀取已經緩衝的中間結果數據(這個中間結果數據多是JOIN順序中,前面錶鏈接的結果的Key,數據量相對較小,內存開銷就小),這樣,與後面的大表進行鏈接時,只須要從buffer中讀取緩存的Key,與大表中的指定Key進行鏈接,速度會更快,也可能避免內存緩衝區溢出。例如:
1 |
SELECT a.val, b.val, c.val FROM a JOIN b ON (a. key = b.key1) JOIN c ON (c. key = b.key1) |
這個JOIN語句,會生成一個MR Job,在選擇JOIN順序的時候,數據量相比應該是b < c,表a和b基於a.key = b.key1進行鏈接,獲得的結果(基於a和b進行鏈接的Key)會在Reducer上緩存在buffer中,在與c進行鏈接時,從buffer中讀取Key(a.key=b.key1)來與表c的c.key進行鏈接。
另外,也能夠經過給出一些Hint信息來啓發JOIN操做,這指定了將哪一個表做爲大表,從而獲得優化。例如:
1 |
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a. key = b.key1) JOIN c ON (c. key = b.key1) |
上述JOIN語句中,a表被視爲大表,則首先會對錶b和c進行JOIN,而後再將獲得的結果與表a進行JOIN。
基於條件的LEFT OUTER JOIN優化
左鏈接時,左表中出現的JOIN字段都保留,右表沒有鏈接上的都爲空。對於帶WHERE條件的JOIN語句,例如:
1 |
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a. key =b. key ) |
2 |
WHERE a.ds= '2009-07-07' AND b.ds= '2009-07-07' |
執行順序是,首先完成2表JOIN,而後再經過WHERE條件進行過濾,這樣在JOIN過程當中可能會輸出大量結果,再對這些結果進行過濾,比較耗時。能夠進行優化,將WHERE條件放在ON後,例如:
1 |
SELECT a.val, b.val FROM a LEFT OUTER JOIN b |
2 |
ON (a. key =b. key AND b.ds= '2009-07-07' AND a.ds= '2009-07-07' ) |
這樣,在JOIN的過程當中,就對不知足條件的記錄進行了預先過濾,可能會有更好的表現。
左半鏈接(LEFT SEMI JOIN)
左半鏈接實現了相似IN/EXISTS的查詢語義,使用關係數據庫子查詢的方式實現查詢SQL,例如:
1 |
SELECT a. key , a.value FROM a WHERE a. key IN ( SELECT b. key FROM b); |
使用Hive對應於以下語句:
1 |
SELECT a. key , a.val FROM a LEFT SEMI JOIN b ON (a. key = b. key ) |
須要注意的是,在LEFT SEMI JOIN中,表b只能出如今ON子句後面,不可以出如今SELECT和WHERE子句中。
關於子查詢,這裏提一下,Hive支持狀況以下:
- 在0.12版本,只支持FROM子句中的子查詢;
- 在0.13版本,也支持WHERE子句中的子查詢。
Map Side JOIN
Map Side JOIN優化的出發點是,Map任務輸出後,不須要將數據拷貝到Reducer節點,下降的數據在網絡節點之間傳輸的開銷。
多表鏈接,若是隻有一個表比較大,其餘表都很小,則JOIN操做會轉換成一個只包含Map的Job,例如:
1 |
SELECT /*+ MAPJOIN(b) */ a. key , a.value FROM a JOIN b ON a. key = b. key |
對於表a數據的每個Map,都可以徹底讀取表b的數據。這裏,表a與b不容許執行FULL OUTER JOIN、RIGHT OUTER JOIN。
BUCKET Map Side JOIN
咱們先看兩個表a和b的DDL,表a爲:
1 |
CREATE TABLE a( key INT , othera STRING) |
2 |
CLUSTERED BY ( key ) INTO 4 BUCKETS |
3 |
ROW FORMAT DELIMITED |
4 |
FIELDS TERMINATED BY '\001' |
5 |
COLLECTION ITEMS TERMINATED BY '\002' |
6 |
MAP KEYS TERMINATED BY '\003' |
7 |
STORED AS SEQUENCEFILE; |
表b爲:
1 |
CREATE TABLE b( key INT , otherb STRING) |
2 |
CLUSTERED BY ( key ) INTO 32 BUCKETS |
3 |
ROW FORMAT DELIMITED |
4 |
FIELDS TERMINATED BY '\001' |
5 |
COLLECTION ITEMS TERMINATED BY '\002' |
6 |
MAP KEYS TERMINATED BY '\003' |
7 |
STORED AS SEQUENCEFILE; |
如今要基於a.key和b.key進行JOIN操做,此時JOIN列同時也是BUCKET列,JOIN語句以下:
1 |
SELECT /*+ MAPJOIN(b) */ a. key , a.value FROM a JOIN b ON a. key = b. key |
而且表a有4個BUCKET,表b有32個BUCKET,默認狀況下,對於表a的每個BUCKET,都會去獲取表b中的每個BUCKET來進行JOIN,這回形成必定的開銷,由於只有表b中知足JOIN條件的BUCKET纔會真正與表a的BUCKET進行鏈接。
這種默認行爲能夠進行優化,經過改變默認JOIN行爲,只須要設置變量:
1 |
set hive.optimize.bucketmapjoin = true |
這樣,JOIN的過程是,表a的BUCKET 1只會與表b中的BUCKET 1進行JOIN,而再也不考慮表b中的其餘BUCKET 2~32。
若是上述表具備相同的BUCKET,如都是32個,並且仍是排序的,亦即,在表定義中在CLUSTERED BY(key)後面增長以下約束:
1 |
SORTED BY ( key ) |
則上述JOIN語句會執行一個Sort-Merge-Bucket (SMB) JOIN,一樣須要設置以下參數來改變默認行爲,優化JOIN時只遍歷相關的BUCKET便可:
1 |
set hive.input. format =org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; |
2 |
set hive.optimize.bucketmapjoin = true ; |
3 |
set hive.optimize.bucketmapjoin.sortedmerge = true ; |