Hive Join Strategies hive的鏈接策略

Common Join

最爲普通的join策略,不受數據量的大小影響,也能夠叫作reduce side join ,最沒效率的一種join 方式. 它由一個mapreduce job 完成. 算法

首先將大表和小表分別進行map 操做, 在map shuffle 的階段每個map output key 變成了table_name_tag_prefix + join_column_value , 可是在進行partition 的時候它仍然只使用join_column_value 進行hash. apache

每個reduce 接受全部的map 傳過來的split , 在reducce 的shuffle 階段,它將map output key 前面的table_name_tag_prefix 給捨棄掉進行比較. 由於reduce 的個數能夠由小表的大小進行決定,因此對於每個節點的reduce 必定能夠將小表的split 放入內存變成hashtable. 而後將大表的每一條記錄進行一條一條的比較. 緩存

真正的Join在reduce階段app

MapJoin

Map Join 的計算步驟分兩步,將小表的數據變成hashtable廣播到全部的map 端,將大表的數據進行合理的切分,而後在map 階段的時候用大表的數據一行一行的去探測(probe) 小表的hashtable. 若是join key 相等,就寫入HDFS. 分佈式

map join 之因此叫作map join 是由於它全部的工做都在map 端進行計算. ide

hive 在map join 上作了幾個優化: oop

hive 0.6 的時候默認認爲寫在select 後面的是大表,前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示進行設定. hive 0.7 的時候這個計算是自動化的,它首先會自動判斷哪一個是小表,哪一個是大表,這個參數由(hive.auto.convert.join=true)來控制. 而後控制小表的大小由(hive.smalltable.filesize=25000000L)參數控制(默認是25M),當小表超過這個大小,hive 會默認轉化成common join. 你能夠查看HIVE-1642. 性能

首先小表的Map 階段它會將本身轉化成MapReduce Local Task ,而後從HDFS 取小表的全部數據,將本身轉化成Hashtable file 並壓縮打包放入DistributedCache 裏面. 優化

目前hive 的map join 有幾個限制,一個是它打算用BloomFilter 來實現hashtable , BloomFilter 大概比hashtable 省8-10倍的內存, 可是BloomFilter 的大小比較難控制. ui

如今DistributedCache 裏面hashtable默認的複製是3份,對於一個有1000個map 的大表來講,這個數字過小,大多數map 操做都等着DistributedCache 複製.

優化後的map-join

Converting Common Join into Map Join

判斷誰是大表誰是小表(小表的標準就是size小於hive.mapjoin.smalltable.filesize的值)

Hive在Compile階段的時候對每個common join會生成一個conditional task,而且對於每個join table,會假設這個table是大表,生成一個mapjoin task,而後把這些mapjoin tasks裝進

conditional task(List<Task<? extends Serializable>> resTasks),同時會映射大表的alias和對應的mapjoin task。在runtime運行時,resolver會讀取每一個table alias對應的input file size,若是小表的file size比設定的threshold要低 (hive.mapjoin.smalltable.filesize,默認值爲25M),那麼就會執行converted mapjoin task。對於每個mapjoin task同時會設置一個backup task,就是先前的common join task,一旦mapjoin task執行失敗了,則會啓用backup task

Performance Bottleneck

性能瓶頸

一、Distributed Cache is the potential performance bottleneck

分佈式緩存是一個潛在的性能瓶頸

A、Large hashtable file will slow down the propagation of Distributed Cache

大的hashtable文件將會減速分佈式緩存的傳播

B、Mappers are waiting for the hashtables file from Distributed Cache

Mapper排隊等待從分佈式緩存獲取hashtables(由於默認一個hashtable緩存是三份,若是mappers數量太多須要一個一個的等待)

二、Compress and archive all the hashtable file into a tar file.

壓縮和歸檔全部的hashtable文件爲一個tar文件。

Bucket Map Join

Why:

Total table/partition size is big, not good for mapjoin.

How:

set hive.optimize.bucketmapjoin = true;

1. Work together with map join

2. All join tables are bucketized, and each small tableʼs bucket number can be divided by big tableʼs bucket number.

全部join的表是bucketized而且小表的bucket數量是大表bucket數量的整數倍

3. Bucket columns == Join columns

hive 建表的時候支持hash 分區經過指定clustered by (col_name,xxx ) into number_buckets buckets 關鍵字.

當鏈接的兩個表的join key 就是bucket column 的時候,就能夠經過

hive.optimize.bucketmapjoin= true

來控制hive 執行bucket map join 了, 須要注意的是你的小表的number_buckets 必須是大表的倍數. 不管多少個表進行鏈接這個條件都必須知足.(其實若是都按照2的指數倍來分bucket, 大表也能夠是小表的倍數,不過這中間須要多計算一次,對int 有效,long 和string 不清楚)

Bucket Map Join 執行計劃分兩步,第一步先將小表作map 操做變成hashtable 而後廣播到全部大表的map端,大表的map端接受了number_buckets 個小表的hashtable並不須要合成一個大的hashtable,直接能夠進行map 操做,map 操做會產生number_buckets 個split,每一個split 的標記跟小表的hashtable 標記是同樣的, 在執行projection 操做的時候,只須要將小表的一個hashtable 放入內存便可,而後將大表的對應的split 拿出來進行判斷,因此其內存限制爲小表中最大的那個hashtable 的大小.

Bucket Map Join 同時也是Map Side Join 的一種實現,全部計算都在Map 端完成,沒有Reduce 的都被叫作Map Side Join ,Bucket 只是hive 的一種hash partition 的實現,另一種固然是值分區.

create table a (xxx) partition by (col_name)

不過通常hive 中兩個表不必定會有同一個partition key, 即便有也不必定會是join key. 因此hive 沒有這種基於值的map side join, hive 中的list partition 主要是用來過濾數據的而不是分區. 兩個主要參數爲(hive.optimize.cp = true 和 hive.optimize.pruner=true)

hadoop 源代碼中默認提供map side join 的實現, 你能夠在hadoop 源碼的src/contrib/data_join/src 目錄下找到相關的幾個類. 其中TaggedMapOutput 便可以用來實現hash 也能夠實現list , 看你本身決定怎麼分區. Hadoop Definitive Guide 第8章關於map side join 和side data distribution 章節也有一個例子示例怎樣實現值分區的map side join.

上圖解釋:b表是大表,a,c是小表而且都是整數倍,將a,c表加入內存先join而後到每一個b表的map去作匹配。

Sort Merge Bucket Map Join

Why:

No limit on file/partition/table size.

How:

set hive.optimize.bucketmapjoin = true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

1.Work together with bucket map join

將bucket加入到map join中

2.Bucket columns == Join columns == sort columns

Bucket Map Join 並無解決map join 在小表必須徹底裝載進內存的限制, 若是想要在一個reduce 節點的大表和小表都不用裝載進內存,必須使兩個表都在join key 上有序才行,你能夠在建表的時候就指定sorted byjoin key 或者使用index 的方式.

作法仍是兩邊要作hash bucket,並且每一個bucket內部要進行排序。這樣一來當兩邊bucket要作局部join的時候,只須要用相似merge sort算法中的merge操做同樣把兩個bucket順序遍歷一遍便可完成,這樣甚至都不用把一個bucket完整的加載成hashtable,這對性能的提高會有很大幫助。

set hive.optimize.bucketmapjoin = true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

Bucket columns == Join columns == sort columns

這樣小表的數據能夠每次只讀取一部分,而後仍是用大表一行一行的去匹配,這樣的join 沒有限制內存的大小. 而且也能夠執行全外鏈接.

Skew Join

Join bottlenecked on the reducer who gets the

skewed key

set hive.optimize.skewjoin = true;

set hive.skewjoin.key = skew_key_threshold

相關文章
相關標籤/搜索