深刻理解 Hive 分區分桶 (Inceptor)

分區是hive存放數據的一種方式。將列值做爲目錄來存放數據,就是一個分區。這樣查詢時使用分區列進行過濾,只需根據列值直接掃描對應目錄下的數據,不掃描其餘不關心的分區,快速定位,提升查詢效率。分動態和靜態分區兩種:java

1. 靜態分區:若分區的值是肯定的,那麼稱爲靜態分區。新增分區或者是加載分區數據時,已經指定分區名。node

create table if not exists day_part1(git

uid int,github

uname string數據庫

)apache

partitioned by(year int,month int)緩存

row format delimited fields terminated by '\t'服務器

;網絡

##加載數據指定分區app

load data local inpath '/root/Desktop/student.txt' into table day_part1 partition(year=2017,month=04);

##新增分區指定分區名

alter table day_part1 add partition(year=2017,month=1) partition(year=2016,month=12);

 1. 動態分區:分區的值是非肯定的,由輸入數據來肯定

 2.1 動態分區的相關屬性:

hive.exec.dynamic.partition=true :是否容許動態分區

hive.exec.dynamic.partition.mode=strict :分區模式設置

strict:最少須要有一個是靜態分區

nostrict:能夠所有是動態分區

hive.exec.max.dynamic.partitions=1000 :容許動態分區的最大數量

hive.exec.max.dynamic.partitions.pernode =100 :單個節點上的mapper/reducer容許建立的最大分區

 2.2 動態分區的操做

##建立臨時表

create table if not exists tmp(

uid int,

commentid bigint,

recommentid bigint,

year int,

month int,

day int

)

row format delimited fields terminated by '\t';

##加載數據

load data local inpath '/root/Desktop/comm' into table tmp;

##建立動態分區表

create table if not exists dyp1(

uid int,

commentid bigint,

recommentid bigint

)

partitioned by(year int,month int,day int)

row format delimited fields terminated by '\t'

;

##嚴格模式

insert into table dyp1 partition(year=2016,month,day)

select uid,commentid,recommentid,month,day from tmp;

##非嚴格模式

##設置非嚴格模式動態分區

set hive.exec.dynamic.partition.mode=nostrict;

##建立動態分區表

create table if not exists dyp2(

uid int,

commentid bigint,

recommentid bigint

)

partitioned by(year int,month int,day int)

row format delimited fields terminated by '\t';

##爲非嚴格模式動態分區加載數據

insert into table dyp2 partition(year,month,day)

select uid,commentid,recommentid,year,month,day from tmp;

 3.分區注意細節

(1)、儘可能不要是用動態分區,由於動態分區的時候,將會爲每個分區分配reducer數量,當分區數量多的時候,reducer數量將會增長,對服務器是一種災難。

(2)、動態分區和靜態分區的區別,靜態分區無論有沒有數據都將會建立該分區,動態分區是有結果集將建立,不然不建立。

(3)、hive動態分區的嚴格模式和hive提供的hive.mapred.mode的嚴格模式。

hive提供咱們一個嚴格模式:爲了阻止用戶不當心提交惡意hql

hive.mapred.mode=nostrict : strict

若是該模式值爲strict,將會阻止如下三種查詢:

(1)、對分區表查詢,where中過濾字段不是分區字段。

(2)、笛卡爾積join查詢,join查詢語句,不帶on條件 或者 where條件。

(3)、對order by查詢,有order by的查詢不帶limit語句。

hive中建立分區表沒有什麼複雜的分區類型(範圍分區、列表分區、hash分區、混合分區等)。分區列也不是表中的一個實際的字段,而是一個或者多個僞列。意思是說在表的數據文件中實際上並不保存分區列的信息與數據。
下面的語句建立了一個簡單的分區表:

create table partition_test
(member_id string,
name string
)
partitioned by (
stat_date string,
province string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

這個例子中建立了stat_date和province兩個字段做爲分區列。一般狀況下須要先預先建立好分區,而後才能使用該分區,例如:

alter table partition_test add partition(stat_date='20110728',province='zhejiang');

這樣就建立好了一個分區。這時咱們會看到hive在HDFS存儲中建立了一個相應的文件夾:

$ hadoop fs -ls/user/hive/warehouse/partition_test/stat_date=20110728
Found 1 items
drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728/province=zhejiang

每個分區都會有一個獨立的文件夾,下面是該分區全部的數據文件。在這個例子中stat_date是主層次,province是副層次,全部stat_date='20110728',而province不一樣的分區都會在/user/hive/warehouse/partition_test/stat_date=20110728下面,而stat_date不一樣的分區都會在/user/hive/warehouse/partition_test/下面,如:

$ hadoop fs -ls /user/hive/warehouse/partition_test/
Found 2 items
drwxr-xr-x - admin supergroup 0 2011-07-28 19:46/user/hive/warehouse/partition_test/stat_date=20110526
drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728

注意,由於分區列的值要轉化爲文件夾的存儲路徑,因此若是分區列的值中包含特殊值,如 '%', ':', '/','#',它將會被使用%加上2字節的ASCII碼進行轉義,如:

hive> alter table partition_test add partition(stat_date='2011/07/28',province='zhejiang');
OK
Time taken: 4.644 seconds

$hadoop fs -ls /user/hive/warehouse/partition_test/
Found 3 items
drwxr-xr-x - admin supergroup 0 2011-07-29 10:06/user/hive/warehouse/partition_test/stat_date=2011% 2F07%2F28
drwxr-xr-x - admin supergroup 0 2011-07-28 19:46/user/hive/warehouse/partition_test/stat_date=20110526
drwxr-xr-x - admin supergroup 0 2011-07-29 09:53/user/hive/warehouse/partition_test/stat_date=20110728

我使用一個輔助的非分區表partition_test_input準備向partition_test中插入數據:

hive> desc partition_test_input;
OK
stat_date string
member_id string
name string
province string

hive> select * from partition_test_input;
OK
20110526 1 liujiannan liaoning
20110526 2 wangchaoqun hubei
20110728 3 xuhongxing sichuan
20110728 4 zhudaoyong henan
20110728 5 zhouchengyu heilongjiang

而後我向partition_test的分區中插入數據:

hive> insert overwrite table partition_testpartition(stat_date='20110728',province='henan') selectmember_id,name from partition_test_input where stat_date='20110728'and province='henan';
Total MapReduce jobs = 2
...
1 Rows loaded to partition_test
OK

還能夠同時向多個分區插入數據,0.7版本之後不存在的分區會自動建立,0.6以前的版本官方文檔上說必需要預先建立好分區:

hive>
> from partition_test_input
> insert overwrite table partition_test partition(stat_date='20110526',province='liaoning')
> select member_id,name where stat_date='20110526'and province='liaoning'
> insert overwrite table partition_test partition(stat_date='20110728',province='sichuan')
> select member_id,name where stat_date='20110728'and province='sichuan'
> insert overwrite table partition_test partition(stat_date='20110728',province='heilongjiang')
> select member_id,name where stat_date='20110728'and province='heilongjiang';
Total MapReduce jobs = 4
...
3 Rows loaded to partition_test
OK

特別要注意,在其餘數據庫中,通常向分區表中插入數據時系統會校驗數據是否符合該分區,若是不符合會報錯。而在hive中,向某個分區中插入什麼樣的數據徹底是由人來控制的,由於分區鍵是僞列,不實際存儲在文件中,如:


hive> insert overwrite table partition_testpartition(stat_date='20110527',province='liaoning') selectmember_id,name from partition_test_input;
Total MapReduce jobs = 2
...
5 Rows loaded to partition_test
OK

hive> select * from partition_test wherestat_date='20110527' and province='liaoning';
OK
1 liujiannan 20110527 liaoning
2 wangchaoqun 20110527 liaoning
3 xuhongxing 20110527 liaoning
4 zhudaoyong 20110527 liaoning
5 zhouchengyu 20110527 liaoning

能夠看到在partition_test_input中的5條數據有着不一樣的stat_date和province,可是在插入到partition(stat_date='20110527',province='liaoning')這個分區後,5條數據的stat_date和province都變成相同的了,由於這兩列的數據是根據文件夾的名字讀取來的,而不是實際從數據文件中讀取來的:

$ hadoop fs -cat/user/hive/warehouse/partition_test/stat_date=20110527/province=liaoning/000000_0
1,liujiannan
2,wangchaoqun
3,xuhongxing
4,zhudaoyong
5,zhouchengyu

下面介紹一下動態分區,由於按照上面的方法向分區表中插入數據,若是源數據量很大,那麼針對一個分區就要寫一個insert,很是麻煩。何況在以前的版本中,必須先手動建立好全部的分區後才能插入,這就更麻煩了,你必須先要知道源數據中都有什麼樣的數據才能建立分區。
使用動態分區能夠很好的解決上述問題。動態分區能夠根據查詢獲得的數據自動匹配到相應的分區中去。 


使用動態分區要先設置hive.exec.dynamic.partition參數值爲true,默認值爲false,即不容許使用:

hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=false
hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=true

動態分區的使用方法很簡單,假設我想向stat_date='20110728'這個分區下面插入數據,至於province插入到哪一個子分區下面讓數據庫本身來判斷,那能夠這樣寫:

hive> insert overwrite table partition_testpartition(stat_date='20110728',province)
> select member_id,name,province frompartition_test_input where stat_date='20110728';
Total MapReduce jobs = 2
...
3 Rows loaded to partition_test
OK

stat_date叫作靜態分區列,province叫作動態分區列。select子句中須要把動態分區列按照分區的順序寫出來,靜態分區列不用寫出來。這樣stat_date='20110728'的全部數據,會根據province的不一樣分別插入到/user/hive/warehouse/partition_test/stat_date=20110728/下面的不一樣的子文件夾下,若是源數據對應的province子分區不存在,則會自動建立,很是方便,並且避免了人工控制插入數據與分區的映射關係存在的潛在風險。

注意,動態分區不容許主分區採用動態列而副分區採用靜態列,這樣將致使全部的主分區都要建立副分區靜態列所定義的分區:

hive> insert overwrite table partition_testpartition(stat_date,province='liaoning')
> select member_id,name,province frompartition_test_input where province='liaoning';
FAILED: Error in semantic analysis: Line 1:48 Dynamic partitioncannot be the parent of a static partition 'liaoning'

動態分區能夠容許全部的分區列都是動態分區列,可是要首先設置一個參數hive.exec.dynamic.partition.mode:

hive> set hive.exec.dynamic.partition.mode;
hive.exec.dynamic.partition.mode=strict

它的默認值是strick,即不容許分區列所有是動態的,這是爲了防止用戶有可能原意是隻在子分區內進行動態建分區,可是因爲疏忽忘記爲主分區列指定值了,這將致使一個dml語句在短期內建立大量的新的分區(對應大量新的文件夾),對系統性能帶來影響。
因此咱們要設置:
hive> sethive.exec.dynamic.partition.mode=nostrick;

再介紹3個參數:
hive.exec.max.dynamic.partitions.pernode (缺省值100):每個mapreducejob容許建立的分區的最大數量,若是超過了這個數量就會報錯
hive.exec.max.dynamic.partitions(缺省值1000):一個dml語句容許建立的全部分區的最大數量
hive.exec.max.created.files (缺省值100000):全部的mapreducejob容許建立的文件的最大數量

當源表數據量很大時,單獨一個mapreducejob中生成的數據在分區列上可能很分散,舉個簡單的例子,好比下面的表要用3個map:
1
1
1
2
2
2
3
3
3

若是數據這樣分佈,那每一個mapreduce只須要建立1個分區就能夠了: 
        |1
map1 --> |1 
        |1 

        |2
map2 --> |2 
        |2 

        |3
map3 --> |3 
        |3
可是若是數據按下面這樣分佈,那第一個mapreduce就要建立3個分區: 

        |1
map1 --> |2 
        |3 

        |1
map2 --> |2 
        |3 

        |1
map3 --> |2 
        |3

下面給出了一個報錯的例子:
hive> sethive.exec.max.dynamic.partitions.pernode=4;
hive> insert overwrite table partition_testpartition(stat_date,province)
> select member_id,name,stat_date,province frompartition_test_input distribute by stat_date,province;
Total MapReduce jobs = 1
...
[Fatal Error] Operator FS_4 (id=4): Number of dynamic partitionsexceeded hive.exec.max.dynamic.partitions.pernode.. Killing thejob.
Ended Job = job_201107251641_0083 with errors
FAILED: Execution Error, return code 2 fromorg.apache.hadoop.hive.ql.exec.MapRedTask

爲了讓分區列的值相同的數據儘可能在同一個mapreduce中,這樣每個mapreduce能夠儘可能少的產生新的文件夾,能夠藉助distributeby的功能,將分區列值相同的數據放到一塊兒:

hive> insert overwrite table partition_testpartition(stat_date,province)
> select member_id,name,stat_date,province frompartition_test_input distribute by stat_date,province;
Total MapReduce jobs = 1
...
18 Rows loaded to partition_test
OK

好了,關於hive的分區表先簡單介紹到這裏,後續版本若是有功能的更新我也會再更新。

 

爲什麼分區分桶
咱們知道傳統的DBMS系統通常都具備表分區的功能,經過表分區可以在特定的區域檢索數據,減小掃描成本,在必定程度上提升查詢效率,固然咱們還能夠經過進一步在分區上創建索引進一步提高查詢效率。在此就不贅述了。

在Hive數倉中也有分區分桶的概念,在邏輯上分區表與未分區表沒有區別,在物理上分區表會將數據按照分區鍵的列值存儲在表目錄的子目錄中,目錄名=「分區鍵=鍵值」。其中須要注意的是分區鍵的值不必定要基於表的某一列(字段),它能夠指定任意值,只要查詢的時候指定相應的分區鍵來查詢便可。咱們能夠對分區進行添加、刪除、重命名、清空等操做。由於分區在特定的區域(子目錄)下檢索數據,它做用同DNMS分區同樣,都是爲了減小掃描成本。

分桶則是指定分桶表的某一列,讓該列數據按照哈希取模的方式隨機、均勻地分發到各個桶文件中。由於分桶操做須要根據某一列具體數據來進行哈希取模操做,故指定的分桶列必須基於表中的某一列(字段)。由於分桶改變了數據的存儲方式,它會把哈希取模相同或者在某一區間的數據行放在同一個桶文件中。如此一來即可提升查詢效率,如:咱們要對兩張在同一列上進行了分桶操做的表進行JOIN操做的時候,只須要對保存相同列值的桶進行JOIN操做便可。同時分桶也能讓取樣(Sampling)更高效。

 

分區
Hive(Inceptor)分區又分爲單值分區、範圍分區。單值分區又分爲靜態分區和動態分區。咱們先看下分區長啥樣。以下,假若有一張表名爲persionrank表,記錄每一個人的評級,有id、name、score字段。咱們即可以建立分區rank(注意rank不是表中的列,咱們能夠把它當作虛擬列),並將相應數據導入指定分區(將數據插入指定目錄)。

單值分區
單值分區根據插入時是否須要手動指定分區能夠分爲:單值靜態分區:導入數據時須要手動指定分區。單值動態分區:導入數據時,系統能夠動態判斷目標分區。

單值分區表的建表方式有兩種:直接定義列和 CREATE TABLE LIKE。注意,單值分區表不能用 CREATE
TABLE AS SELECT 建表。而範圍分區表只能經過直接定義列來建表。

一、靜態分區建立

直接在 PARTITIONED BY 後面跟上分區鍵、類型便可。(分區鍵不能和任何列重名)

CREATE [EXTERNAL] TABLE <table_name>
    (<col_name> <data_type> [, <col_name> <data_type> ...])
    -- 指定分區鍵和數據類型
    PARTITIONED BY  (<partition_key> <data_type>, ...) 
    [CLUSTERED BY ...] 
    [ROW FORMAT <row_format>] 
    [STORED AS TEXTFILE|ORC|CSVFILE]
    [LOCATION '<file_path>']    
    [TBLPROPERTIES ('<property_name>'='<property_value>', ...)];
二、靜態分區寫入

-- 覆蓋寫入
INSERT OVERWRITE TABLE <table_name> 
    PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...]) 
    SELECT <select_statement>;
 
-- 追加寫入
INSERT INTO TABLE <table_name> 
    PARTITION (<partition_key>=<partition_value>[, <partition_key>=<partition_value>, ...])
    SELECT <select_statement>;
 
 
三、動態分區建立

建立方式與靜態分區表徹底同樣,一張表可同時被靜態和動態分區鍵分區,只是動態分區鍵須要放在靜態分區建的後面(由於HDFS上的動態分區目錄下不能包含靜態分區的子目錄),以下 spk 即 static partition key, dpk 即 dynamic partition key。

CREATE TABLE <table_name>
 PARTITIONED BY ([<spk> <data_type>, ... ,] <dpk> <data_type>, [<dpk>
<data_type>,...]);
-- ...略
四、動態分區寫入

靜態分區鍵要用 <spk>=<value> 指定分區值;動態分區只須要給出分出分區鍵名稱 <dpk>。

-- 開啓動態分區支持,並設置最大分區數
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions=2000;
 
-- <dpk>爲動態分區鍵, <spk>爲靜態分區鍵
INSERT (OVERWRITE | INTO) TABLE <table_name>
    PARTITION ([<spk>=<value>, ..., ] <dpk>, [..., <dpk>]) 
    SELECT <select_statement>; 
範圍分區
單值分區每一個分區對應於分區鍵的一個取值,而每一個範圍分區則對應分區鍵的一個區間,只要落在指定區間內的記錄都被存儲在對應的分區下。分區範圍須要手動指定,分區的範圍爲前閉後開區間 [最小值, 最大值)。最後出現的分區可使用 MAXVALUE 做爲上限,MAXVALUE 表明該分區鍵的數據類型所容許的最大
值。

CREATE [EXTERNAL] TABLE <table_name>
    (<col_name> <data_type>, <col_name> <data_type>, ...)
    PARTITIONED BY RANGE (<partition_key> <data_type>, ...) 
        (PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>), 
            [PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>),
              ...
            ]
            PARTITION [<partition_name>] VALUES LESS THAN (<cutoff>|MAXVALUE) 
        )
    [ROW FORMAT <row_format>] [STORED AS TEXTFILE|ORC|CSVFILE]
    [LOCATION '<file_path>']    
    [TBLPROPERTIES ('<property_name>'='<property_value>', ...)];
eg:多個範圍分區鍵的狀況:

DROP TABLE IF EXISTS test_demo;
CREATE TABLE test_demo (value INT)
PARTITIONED BY RANGE (id1 INT, id2 INT, id3 INT)
(
-- id1在(--∞,5]之間,id2在(-∞,105]之間,id3在(-∞,205]之間
PARTITION p5_105_205 VALUES LESS THAN (5, 105, 205),
-- id1在(--∞,5]之間,id2在(-∞,105]之間,id3在(205,215]之間
PARTITION p5_105_215 VALUES LESS THAN (5, 105, 215),
PARTITION p5_115_max VALUES LESS THAN (5, 115, MAXVALUE),
PARTITION p10_115_205 VALUES LESS THAN (10, 115, 205),
PARTITION p10_115_215 VALUES LESS THAN (10, 115, 215),
PARTITION pall_max values less than (MAXVALUE, MAXVALUE, MAXVALUE)
);
 

分桶
說完分區,咱們來繼續搞分桶。對Hive(Inceptor)表分桶能夠將表中記錄按分桶鍵的哈希值分散進多個文件中,這些小文件稱爲桶。

建立分桶表
咱們先看一下建立分桶表的建立,分桶表的建表有三種方式:直接建表,CREATE TABLE LIKE 和 CREATE TABLE AS SELECT ,單值分區表不能用 CREATETABLE AS SELECT 建表。這裏以直接建表爲例:

CREATE [EXTERNAL] TABLE <table_name>
    (<col_name> <data_type> [, <col_name> <data_type> ...])]
    [PARTITIONED BY ...] 
    CLUSTERED BY (<col_name>) 
        [SORTED BY (<col_name> [ASC|DESC] [, <col_name> [ASC|DESC]...])] 
        INTO <num_buckets> BUCKETS  
    [ROW FORMAT <row_format>] 
    [STORED AS TEXTFILE|ORC|CSVFILE]
    [LOCATION '<file_path>']    
    [TBLPROPERTIES ('<property_name>'='<property_value>', ...)];
分桶鍵只能有一個即<col_name>。表能夠同時分區和分桶,當表分區時,每一個分區下都會有<num_buckets> 個桶。咱們也能夠選擇使用 SORTED BY … 在桶內排序,排序鍵和分桶鍵無需相同。ASC 爲升序選項,DESC 爲降序選項,默認排序方式是升序。<num_buckets> 指定分桶個數,也就是表目錄下小文件的個數。

向分桶表寫入數據
由於分桶表在建立的時候只會定義Scheme,且寫入數據的時候不會自動進行分桶、排序,須要人工先進行分桶、排序後再寫入數據。確保目標表中的數據和它定義的分佈一致。

目前有兩種方式往分桶表中插入數據:

方法一:打開enforce bucketing開關。

SET hive.enforce.bucketing=true; ①
INSERT (INTO|OVERWRITE) TABLE <bucketed_table> SELECT <select_statement>
[SORT BY <sort_key> [ASC|DESC], [<sort_key> [ASC|DESC], ...]]; ②
方法二:將reducer個數設置爲目標表的桶數,並在 SELECT 語句中用 DISTRIBUTE BY <bucket_key>對查詢結果按目標表的分桶鍵分進reducer中。

SET mapred.reduce.tasks = <num_buckets>; 
INSERT (INTO|OVERWRITE) TABLE <bucketed_table>
SELECT <select_statement>
DISTRIBUTE BY <bucket_key>, [<bucket_key>, ...] 
[SORT BY <sort_key> [ASC|DESC], [<sort_key> [ASC|DESC], ...]]; 
若是分桶表建立時定義了排序鍵,那麼數據不只要分桶,還要排序
若是分桶鍵和排序鍵不一樣,且按降序排列,使用Distribute by … Sort by分桶排序
若是分桶鍵和排序鍵相同,且按升序排列(默認),使用 Cluster by 分桶排序,即以下:
SET mapred.reduce.tasks = <num_buckets>;
INSERT (INTO|OVERWRITE) TABLE <bucketed_table>
SELECT <select_statement>
CLUSTER BY <bucket_sort_key>, [<bucket_sort_key>, ...];
 

另外補充說明一下,在Hive(Inceptor)中,ORC事務表必須進行分桶(爲了提升效率)。每一個桶的文件大小應在100~200MB之間(ORC表壓縮後的數據)。一般作法是先分區後分桶。
--------------------- 

使用Hive SQL插入動態分區的Parquet表OOM異常分析

舒適提示:要看高清無碼套圖,請使用手機打開並單擊圖片放大查看。 Fayson的github:https://github.com/fayson/cdhproject 提示:代碼塊部分能夠左右滑動查看噢

1.異常描述


當運行「INSERT ... SELECT」語句向Parquet或者ORC格式的表中插入數據時,若是啓用了動態分區,你可能會碰到如下錯誤,而致使做業沒法正常執行。

Hive客戶端:

 
  1. Task with the most failures(4):

  2. Diagnostic Messages for this Task:

  3. Error: GC overhead limit exceeded

  4. ...

  5. FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

  6. MapReduce Jobs Launched:

  7. Stage-Stage-1: Map: 1 HDFS Read: 0 HDFS Write: 0 FAIL

  8. Total MapReduce CPU Time Spent: 0 msec

(可左右滑動)

YARN的8088中查看具體map task報錯:

2017-10-27 17:08:04,317 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded

(可左右滑動)

2.異常分析


Parquet和ORC是列式批處理文件格式。這些格式要求在寫入文件以前將批次的行(batches of rows)緩存在內存中。在執行INSERT語句時,動態分區目前的實現是:至少爲每一個動態分區目錄打開一個文件寫入器(file writer)。因爲這些緩衝區是按分區維護的,所以在運行時所需的內存量隨着分區數量的增長而增長。因此常常會致使mappers或reducers的OOM,具體取決於打開的文件寫入器(file writer)的數量。

經過INSERT語句插入數據到動態分區表中,也可能會超過HDFS同時打開文件數的限制。

若是沒有join或聚合,INSERT ... SELECT語句會被轉換爲只有map任務的做業。mapper任務會讀取輸入記錄而後將它們發送到目標分區目錄。在這種狀況下,每一個mapper必須爲遇到的每一個動態分區建立一個新的文件寫入器(file writer)。mapper在運行時所需的內存量隨着它遇到的分區數量的增長而增長。

3.異常重現與解決

3.1.生成動態分區的幾個參數說明


hive.exec.dynamic.partition

默認值:false

是否開啓動態分區功能,默認false關閉。

使用動態分區時候,該參數必須設置成true;

hive.exec.dynamic.partition.mode

默認值:strict

動態分區的模式,默認strict,表示必須指定至少一個分區爲靜態分區,nonstrict模式表示容許全部的分區字段均可以使用動態分區。

通常須要設置爲nonstrict

hive.exec.max.dynamic.partitions.pernode

默認值:100

在每一個執行MR的節點上,最大能夠建立多少個動態分區。

該參數須要根據實際的數據來設定。

好比:源數據中包含了一年的數據,即day字段有365個值,那麼該參數就須要設置成大於365,若是使用默認值100,則會報錯。

hive.exec.max.dynamic.partitions

默認值:1000

在全部執行MR的節點上,最大一共能夠建立多少個動態分區。

同上參數解釋。

hive.exec.max.created.files

默認值:100000

整個MR Job中,最大能夠建立多少個HDFS文件。

通常默認值足夠了,除非你的數據量很是大,須要建立的文件數大於100000,可根據實際狀況加以調整。

mapreduce.map.memory.mb

map任務的物理內存分配值,常見設置爲1GB,2GB,4GB等。

mapreduce.map.java.opts

map任務的Java堆棧大小設置,通常設置爲小於等於上面那個值的75%,這樣能夠保證map任務有足夠的堆棧外內存空間。

mapreduce.input.fileinputformat.split.maxsize

mapreduce.input.fileinputformat.split.minsize

這個兩個參數聯合起來用,主要是爲了方便控制mapreduce的map數量。好比我設置爲1073741824,就是爲了讓每一個map處理1GB的文件。

3.2.一個例子


Fayson在前兩天給人調一個使用Hive SQL插入動態分區的Parquet表時,老是報錯OOM,也是折騰了好久。如下咱們來看看整個過程。

1.首先咱們看看執行腳本的內容,基本其實就是使用Hive的insert語句將文本數據表插入到另一張parquet表中,固然使用了動態分區。

2.咱們看看原始數據文件,是文本文件,一共120個,每一個30GB大小,總共差很少3.6TB。

3.咱們看看報錯

4.由於是一個只有map的mapreduce任務,當咱們從YARN的8088觀察這個做業時能夠發現,基本沒有一個map可以執行成功,所有都是失敗的。報上面的錯誤。

5.把mapreduce.map.memory.mb從2GB增大到4GB,8GB,16GB,相應mapreduce.map.java.opts增大到3GB,6GB,12GB。依舊報錯OOM。

6.後面又將mapreduce.input.fileinputformat.split.maxsize從1GB,減小爲512MB,256MB,從而增大map數量,縮小單個map處理文件的大小。依舊報錯OOM。

7.最後啓用hive.optimize.sort.dynamic.partition,增長reduce過程,做業執行成功。

8.最後查看結果文件大約1.2TB,約爲輸入文件的三分之一。一共1557個分區,最大的分區文件爲2GB。

4.異常總結


對於這個異常,咱們建議有如下三種方式來處理:

1.啓用hive.optimize.sort.dynamic.partition,將其設置爲true。經過這個優化,這個只有map任務的mapreduce會引入reduce過程,這樣動態分區的那個字段好比日期在傳到reducer時會被排序。因爲分區字段是排序的,所以每一個reducer只須要保持一個文件寫入器(file writer)隨時處於打開狀態,在收到來自特定分區的全部行後,關閉記錄寫入器(record writer),從而減少內存壓力。這種優化方式在寫parquet文件時使用的內存要相對少一些,但代價是要對分區字段進行排序。

 
  1. SET hive.optimize.sort.dynamic.partition=true;

  2. INSERT OVERWRITE TABLE [table] SELECT ...

2.第二種方式就是增長每一個mapper的內存分配,即增大mapreduce.map.memory.mb和mapreduce.map.java.opts,這樣全部文件寫入器(filewriter)緩衝區對應的內存會更充沛。

3.將查詢分解爲幾個較小的查詢,以減小每一個查詢建立的分區數量。這樣可讓每一個mapper打開較少的文件寫入器(file writer)。

備註:

默認狀況下,Hive爲每一個打開的Parquet文件緩衝區(file buffer)分配128MB。這個buffer大小由參數parquet.block.size控制。爲得到最佳性能,parquet的buffer size須要與HDFS的block size保持對齊(好比相等),從而使每一個parquet文件在單個HDFS的塊中,以便每一個I/O請求均可以讀取整個數據文件,而無需經過網絡傳輸訪問後續的block。

 
  1. -- set Parquetbuffer size to 256MB (in bytes)

  2. set parquet.block.size=268435456

相關文章
相關標籤/搜索