Hive中的join操做原理和優化

2018-12-27 10:00:12數據庫

Hive是基於Hadoop平臺的,它提供了相似SQL同樣的查詢語言HQL。有了Hive,若是使用過SQL語言,而且不理解Hadoop MapReduce運行原理,也就沒法經過編程來實現MR,可是你仍然能夠很容易地編寫出特定查詢分析的HQL語句,經過使用相似SQL的語法,將HQL查詢語句提交Hive系統執行查詢分析,最終Hive會幫你轉換成底層Hadoop可以理解的MR Job。
對於最基本的HQL查詢咱們再也不累述,這裏主要說明Hive中進行統計分析時使用到的JOIN操做。在說明Hive JOIN以前,咱們先簡單說明一下,Hadoop執行MR Job的基本過程(運行機制),能更好的幫助咱們理解HQL轉換到底層的MR Job後是如何執行的。咱們重點說明MapReduce執行過程當中,從Map端到Reduce端這個過程(Shuffle)的執行狀況,如圖所示(來自《Hadoop: The Definitive Guide》):
hadoop-mr-shuffle
基本執行過程,描述以下:apache

  1. 一個InputSplit輸入到map,會運行咱們實現的Mapper的處理邏輯,對數據進行映射操做。
  2. map輸出時,會首先將輸出中間結果寫入到map自帶的buffer中(buffer默認大小爲100M,能夠經過io.sort.mb配置)。
  3. map自帶的buffer使用容量達到必定門限(默認0.80或80%,能夠經過io.sort.spill.percent配置),一個後臺線程會準備將buffer中的數據寫入到磁盤。
  4. 這個後臺線程在將buffer中數據寫入磁盤以前,會首先將buffer中的數據進行partition(分區,partition數爲Reducer的個數),對於每一個的數據會基於Key進行一個in-memory排序。
  5. 排序後,會檢查是否配置了Combiner,若是配置了則直接做用到已排序的每一個partition的數據上,對map輸出進行化簡壓縮(這樣寫入磁盤的數據量就會減小,下降I/O操做開銷)。
  6. 如今能夠將通過處理的buffer中的數據寫入磁盤,生成一個文件(每次buffer容量達到設置的門限,都會對應着一個寫入到磁盤的文件)。
  7. map任務結束以前,會對輸出的多個文件進行合併操做,合併成一個文件(若map輸出至少3個文件,在多個文件合併後寫入以前,若是配置了Combiner,則會運行來化簡壓縮輸出的數據,文件個數能夠經過min.num.splits.for.combine配置;若是指定了壓縮map輸出,這裏會根據配置對數據進行壓縮寫入磁盤),這個文件仍然保持partition和排序的狀態。
  8. reduce階段,每一個reduce任務開始從多個map上拷貝屬於本身partition(map階段已經作好partition,並且每一個reduce任務知道應該拷貝哪一個partition;拷貝過程是在不一樣節點之間,Reducer上拷貝線程基於HTTP來經過網絡傳輸數據)。
  9. 每一個reduce任務拷貝的map任務結果的指定partition,也是先將數據放入到自帶的一個buffer中(buffer默認大小爲Heap內存的70%,能夠經過mapred.job.shuffle.input.buffer.percent配置),若是配置了map結果進行壓縮,則這時要先將數據解壓縮後放入buffer中。
  10. reduce自帶的buffer使用容量達到必定門限(默認0.66或66%,能夠經過mapred.job.shuffle.merge.percent配置),或者buffer中存放的map的輸出的數量達到必定門限(默認1000,能夠經過mapred.inmem.merge.threshold配置),buffer中的數據將會被寫入到磁盤中。
  11. 在將buffer中多個map輸出合併寫入磁盤以前,若是設置了Combiner,則會化簡壓縮合並的map輸出。
  12. 當屬於該reducer的map輸出所有拷貝完成,則會在reducer上生成多個文件,這時開始執行合併操做,並保持每一個map輸出數據中Key的有序性,將多個文件合併成一個文件(在reduce端可能存在buffer和磁盤上都有數據的狀況,這樣在buffer中的數據能夠減小必定量的I/O寫入操做開銷)。
  13. 最後,執行reduce階段,運行咱們實現的Reducer中化簡邏輯,最終將結果直接輸出到HDFS中(由於Reducer運行在DataNode上,輸出結果的第一個replica直接在存儲在本地節點上)。

經過上面的描述咱們看到,在MR執行過程當中,存在Shuffle過程的MR須要在網絡中的節點之間(Mapper節點和Reducer節點)拷貝數據,若是傳輸的數據量很大會形成必定的網絡開銷。並且,Map端和Reduce端都會經過一個特定的buffer來在內存中臨時緩存數據,若是沒法根據實際應用場景中數據的規模來使用Hive,尤爲是執行表的JOIN操做,有可能很浪費資源,下降了系統處理任務的效率,還可能由於內存不足形成OOME問題,致使計算任務失敗。
下面,咱們說明Hive中的JOIN操做,針對不一樣的JOIN方式,應該如何來實現和優化:編程

生成一個MR Job緩存

多表鏈接,若是多個表中每一個表都使用同一個列進行鏈接(出如今JOIN子句中),則只會生成一個MR Job,例如:網絡

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key1)

三個表a、b、c都分別使用了同一個字段進行鏈接,亦即同一個字段同時出如今兩個JOIN子句中,從而只生成一個MR Job。app

生成多個MR Jobide

多表鏈接,若是多表中,其中存在一個表使用了至少2個字段進行鏈接(同一個表的至少2個列出如今JOIN子句中),則會至少生成2個MR Job,例如:oop

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key2)

三個表基於2個字段進行鏈接,這兩個字段b.key1和b.key2同時出如今b表中。鏈接的過程是這樣的:首先a和b表基於a.key和b.key1進行鏈接,對應着第一個MR Job;表a和b鏈接的結果,再和c進行鏈接,對應着第二個MR Job。優化

錶鏈接順序優化ui

多表鏈接,會轉換成多個MR Job,每個MR Job在Hive中稱爲JOIN階段(Stage)。在每個Stage,按照JOIN順序中的最後一個表應該儘可能是大表,由於JOIN前一階段生成的數據會存在於Reducer的buffer中,經過stream最後面的表,直接從Reducer的buffer中讀取已經緩衝的中間結果數據(這個中間結果數據多是JOIN順序中,前面錶鏈接的結果的Key,數據量相對較小,內存開銷就小),這樣,與後面的大表進行鏈接時,只須要從buffer中讀取緩存的Key,與大表中的指定Key進行鏈接,速度會更快,也可能避免內存緩衝區溢出。例如:

1 SELECT a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON (c.key = b.key1)

這個JOIN語句,會生成一個MR Job,在選擇JOIN順序的時候,數據量相比應該是b < c,表a和b基於a.key = b.key1進行鏈接,獲得的結果(基於a和b進行鏈接的Key)會在Reducer上緩存在buffer中,在與c進行鏈接時,從buffer中讀取Key(a.key=b.key1)來與表c的c.key進行鏈接。
另外,也能夠經過給出一些Hint信息來啓發JOIN操做,這指定了將哪一個表做爲大表,從而獲得優化。例如:

1 SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM JOIN ON (a.key = b.key1) JOIN ON(c.key = b.key1)

上述JOIN語句中,a表被視爲大表,則首先會對錶b和c進行JOIN,而後再將獲得的結果與表a進行JOIN。

基於條件的LEFT OUTER JOIN優化

左鏈接時,左表中出現的JOIN字段都保留,右表沒有鏈接上的都爲空。對於帶WHERE條件的JOIN語句,例如:

1 SELECT a.val, b.val FROM LEFT OUTER JOIN ON (a.key=b.key)
2 WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'

執行順序是,首先完成2表JOIN,而後再經過WHERE條件進行過濾,這樣在JOIN過程當中可能會輸出大量結果,再對這些結果進行過濾,比較耗時。能夠進行優化,將WHERE條件放在ON後,例如:

1 SELECT a.val, b.val FROM LEFT OUTER JOIN b
2 ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07')

這樣,在JOIN的過程當中,就對不知足條件的記錄進行了預先過濾,可能會有更好的表現。

左半鏈接(LEFT SEMI JOIN)

左半鏈接實現了相似IN/EXISTS的查詢語義,使用關係數據庫子查詢的方式實現查詢SQL,例如:

1 SELECT a.key, a.value FROM WHERE a.key IN (SELECT b.key FROM b);

使用Hive對應於以下語句:

1 SELECT a.key, a.val FROM LEFT SEMI JOIN ON (a.key = b.key)

須要注意的是,在LEFT SEMI JOIN中,表b只能出如今ON子句後面,不可以出如今SELECT和WHERE子句中。
關於子查詢,這裏提一下,Hive支持狀況以下:

  • 在0.12版本,只支持FROM子句中的子查詢;
  • 在0.13版本,也支持WHERE子句中的子查詢。

Map Side JOIN

Map Side JOIN優化的出發點是,Map任務輸出後,不須要將數據拷貝到Reducer節點,下降的數據在網絡節點之間傳輸的開銷。
多表鏈接,若是隻有一個表比較大,其餘表都很小,則JOIN操做會轉換成一個只包含Map的Job,例如:

1 SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM JOIN ON a.key = b.key

對於表a數據的每個Map,都可以徹底讀取表b的數據。這裏,表a與b不容許執行FULL OUTER JOIN、RIGHT OUTER JOIN。

BUCKET Map Side JOIN

咱們先看兩個表a和b的DDL,表a爲:

1 CREATE TABLE a(key INT, othera STRING)
2 CLUSTERED BY(keyINTO 4 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED BY '\001'
5 COLLECTION ITEMS TERMINATED BY '\002'
6 MAP KEYS TERMINATED BY '\003'
7 STORED AS SEQUENCEFILE;

表b爲:

1 CREATE TABLE b(key INT, otherb STRING)
2 CLUSTERED BY(keyINTO 32 BUCKETS
3 ROW FORMAT DELIMITED
4 FIELDS TERMINATED BY '\001'
5 COLLECTION ITEMS TERMINATED BY '\002'
6 MAP KEYS TERMINATED BY '\003'
7 STORED AS SEQUENCEFILE;

如今要基於a.key和b.key進行JOIN操做,此時JOIN列同時也是BUCKET列,JOIN語句以下:

1 SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM JOIN ON a.key = b.key

而且表a有4個BUCKET,表b有32個BUCKET,默認狀況下,對於表a的每個BUCKET,都會去獲取表b中的每個BUCKET來進行JOIN,這回形成必定的開銷,由於只有表b中知足JOIN條件的BUCKET纔會真正與表a的BUCKET進行鏈接。
這種默認行爲能夠進行優化,經過改變默認JOIN行爲,只須要設置變量:

1 set hive.optimize.bucketmapjoin = true

這樣,JOIN的過程是,表a的BUCKET 1只會與表b中的BUCKET 1進行JOIN,而再也不考慮表b中的其餘BUCKET 2~32。
若是上述表具備相同的BUCKET,如都是32個,並且仍是排序的,亦即,在表定義中在CLUSTERED BY(key)後面增長以下約束:

1 SORTED BY(key)

則上述JOIN語句會執行一個Sort-Merge-Bucket (SMB) JOIN,一樣須要設置以下參數來改變默認行爲,優化JOIN時只遍歷相關的BUCKET便可:

1 set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
2 set hive.optimize.bucketmapjoin = true;
3 set hive.optimize.bucketmapjoin.sortedmerge = true;
相關文章
相關標籤/搜索