Hive 已經是目前業界最爲通用、廉價的構建大數據時代數據倉庫的解決方案了,雖然也有 Impala 等後起之秀,但目前從功能、穩定性等方面來講,Hive 的地位尚不可撼動。 html
其實這篇博文主要是想聊聊 SMB join 的,Join 是整個 MR/Hive 最爲核心的部分之一,是每一個 Hadoop/Hive/DW RD 必須掌握的部分,以前也有幾篇文章聊到過 MR/Hive 中的 join,其實底層都是相同的,只是上層作了些封裝而已,若是你還不瞭解究竟 Join 有哪些方式,以及底層怎麼實現的,請參考以下連接: android
http://my.oschina.net/leejun2005/blog/95186 MapReduce 中的兩表 join 幾種方案簡介 sql
http://my.oschina.net/leejun2005/blog/111963 Hadoop 多表 join:map side join 範例 app
http://my.oschina.net/leejun2005/blog/158491 Hive & Performance 學習筆記 ide
在最後一篇連接中,有這麼兩副圖: 函數
前面兩個很好理解,基本上每一個人都會接觸到,但最後一種,可能有同窗仍是比較陌生,SMB 存在的目的主要是爲了解決大表與大表間的 Join 問題,分桶其實就是把大表化成了「小表」,而後 Map-Side Join 解決之,這是典型的分而治之的思想。在聊 SMB Join 以前,咱們仍是先複習下相關的基礎概念。 oop
一、Hive 分區表
在Hive Select查詢中通常會掃描整個表內容,會消耗不少時間作不必的工做。有時候只須要掃描表中關心的一部分數據,所以建表時引入了partition概念。分區表指的是在建立表時指定的partition的分區空間。
Hive能夠對數據按照某列或者某些列進行分區管理,所謂分區咱們能夠拿下面的例子進行解釋。
當前互聯網應用天天都要存儲大量的日誌文件,幾G、幾十G甚至更大都是有可能。存儲日誌,其中必然有個屬性是日誌產生的日期。在產生分區時,就能夠按照日誌產生的日期列進行劃分。把每一天的日誌看成一個分區。
將數據組織成分區,主要能夠提升數據的查詢速度。至於用戶存儲的每一條記錄到底放到哪一個分區,由用戶決定。即用戶在加載數據的時候必須顯示的指定該部分數據放到哪一個分區。
1.1 實現細節
一、一個表能夠擁有一個或者多個分區,每一個分區以文件夾的形式單獨存在表文件夾的目錄下。
二、表和列名不區分大小寫。
三、分區是以字段的形式在表結構中存在,經過describe table命令能夠查看到字段存在,
可是該字段不存放實際的數據內容,僅僅是分區的表示(僞列)
。
1.2 語法
1. 建立一個分區表,以 ds 爲分區列:
create table invites (id int, name string) partitioned by (ds string) row format delimited fields terminated by 't' stored as textfile;
2. 將數據添加到時間爲 2013-08-16 這個分區中:
load data local inpath '/home/hadoop/Desktop/data.txt' overwrite into table invites partition (ds='2013-08-16');
3. 將數據添加到時間爲 2013-08-20 這個分區中:
load data local inpath '/home/hadoop/Desktop/data.txt' overwrite into table invites partition (ds='2013-08-20');
4. 從一個分區中查詢數據:
select * from invites where ds ='2013-08-12';
5. 往一個分區表的某一個分區中添加數據:
insert overwrite table invites partition (ds='2013-08-12') select id,max(name) from test group by id;
能夠查看分區的具體狀況,使用命令:
hadoop fs -ls /home/hadoop.hive/warehouse/invites
或者:
show partitions tablename;
二、Hive 桶
對於每個表(table)或者分區,
Hive能夠進一步組織成桶,也就是說桶是更爲細粒度的數據範圍劃分。Hive也是
針對某一列進行桶的組織。Hive採用對列值哈希,而後除以桶的個數求餘的方式決定該條記錄存放在哪一個桶當中。
把表(或者分區)組織成桶(Bucket)有兩個理由: 佈局
(1)得到更高的查詢處理效率。桶爲表加上了額外的結構,Hive 在處理有些查詢時能利用這個結構。具體而言,鏈接兩個在(包含鏈接列的)相同列上劃分了桶的表,可使用 Map 端鏈接 (Map-side join)高效的實現。好比JOIN操做。對於JOIN操做兩個表有一個相同的列,若是對這兩個表都進行了桶操做。那麼將保存相同列值的桶進行JOIN操做就能夠,能夠大大較少JOIN的數據量。 學習
(2)使取樣(sampling)更高效。在處理大規模數據集時,在開發和修改查詢的階段,若是能在數據集的一小部分數據上試運行查詢,會帶來不少方便。 大數據
1. 建立帶桶的 table :
create table bucketed_user(id int,name string) clustered by (id) sorted by(name) into 4 buckets row format delimited fields terminated by '\t' stored as textfile;
首先,咱們來看如何告訴Hive—個表應該被劃分紅桶。咱們使用CLUSTERED BY 子句來指定劃分桶所用的列和要劃分的桶的個數:
CREATE TABLE bucketed_user (id INT) name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS;
在這裏,咱們使用用戶ID來肯定如何劃分桶(Hive使用對值進行哈希並將結果除 以桶的個數取餘數。這樣,任何一桶裏都會有一個隨機的用戶集合(PS:其實也能說是隨機,不是嗎?)。
對於map端鏈接的狀況,兩個表以相同方式劃分桶。處理左邊表內某個桶的 mapper知道右邊表內相匹配的行在對應的桶內。所以,mapper只須要獲取那個桶 (這只是右邊表內存儲數據的一小部分)便可進行鏈接。這一優化方法並不必定要求 兩個表必須桶的個數相同,兩個表的桶個數是倍數關係也能夠。用HiveQL對兩個劃分了桶的表進行鏈接,可參見「map鏈接」部分(P400)。
桶中的數據能夠根據一個或多個列另外進行排序。因爲這樣對每一個桶的鏈接變成了高效的歸併排序(merge-sort), 所以能夠進一步提高map端鏈接的效率。如下語法聲明一個表使其使用排序桶:
CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) SORTED BY (id ASC) INTO 4 BUCKETS;
咱們如何保證表中的數據都劃分紅桶了呢?把在Hive外生成的數據加載到劃分紅 桶的表中,固然是能夠的。其實讓Hive來劃分桶更容易。這一操做一般針對已有的表。
Hive並不檢查數據文件中的桶是否和表定義中的桶一致(不管是對於桶 的數量或用於劃分桶的列)。若是二者不匹配,在査詢時可能會碰到錯 誤或未定義的結果。所以,建議讓Hive來進行劃分桶的操做。
有一個沒有劃分桶的用戶表:
hive> SELECT * FROM users;
0 Nat
2 Doe
B Kay
4 Ann
2. 強制多個 reduce 進行輸出:
要向分桶表中填充成員,須要將 hive.enforce.bucketing 屬性設置爲 true。①這
樣,Hive 就知道用表定義中聲明的數量來建立桶。而後使用 INSERT 命令便可。須要注意的是:
clustered by和sorted by不會影響數據的導入,這意味着,用戶必須本身負責數據如何如何導入,包括數據的分桶和排序。
'set hive.enforce.bucketing = true' 能夠自動控制上一輪reduce的數量從而適配bucket的個數,固然,用戶也能夠自主設置mapred.reduce.tasks去適配bucket個數,推薦使用'set hive.enforce.bucketing = true'
3. 往表中插入數據:
INSERT OVERWRITE TABLE bucketed_users SELECT * FROM users;
物理上,每一個桶就是表(或分區)目錄裏的一個文件。它的文件名並不重要,可是桶 n 是按照字典序排列的第 n 個文件。事實上,桶對應於 MapReduce 的輸出文件分區:一個做業產生的桶(輸出文件)和reduce任務個數相同。咱們能夠經過查看剛纔 建立的bucketd_users表的佈局來了解這一狀況。運行以下命令:
4. 查看錶的結構:
hive> dfs -ls /user/hive/warehouse/bucketed_users;
將顯示有4個新建的文件。文件名以下(文件名包含時間戳,由Hive產生,所以 每次運行都會改變):
attempt_201005221636_0016_r_000000_0
attempt_201005221636_0016_r-000001_0
attempt_201005221636_0016_r_000002_0
attempt_201005221636_0016_r_000003_0
第一個桶裏包括用戶IDO和4,由於一個INT的哈希值就是這個整數自己,在這裏 除以桶數(4)之後的餘數:②
5. 讀取數據,看每個文件的數據:
hive> dfs -cat /user/hive/warehouse/bucketed_users/*0_0;
0 Nat
4 Ann
用TABLESAMPLE子句對錶進行取樣,咱們能夠得到相同的結果。這個子句會將 查詢限定在表的一部分桶內,而不是使用整個表:
6. 對桶中的數據進行採樣:
hive> SELECT * FROM bucketed_users
> TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
0 Nat
4 Ann
桶的個數從1開始計數。所以,前面的查詢從4個桶的第一個中獲取全部的用戶。 對於一個大規模的、均勻分佈的數據集,這會返回表中約四分之一的數據行。咱們 也能夠用其餘比例對若干個桶進行取樣(由於取樣並非一個精確的操做,所以這個 比例不必定要是桶數的整數倍)。例如,下面的查詢返回一半的桶:
7. 查詢一半返回的桶數:
hive> SELECT * FROM bucketed_users
> TABLESAMPLE(BUCKET 1 OUT OF 2 ON id);
0 Nat
4 Ann
2 Joe
由於查詢只須要讀取和TABLESAMPLE子句匹配的桶,因此取樣分桶表是很是高效
的操做。若是使用rand()函數對沒有劃分紅桶的表進行取樣,即便只須要讀取很
小一部分樣本,也要掃描整個輸入數據集:
hive〉 SELECT * FROM users
> TABLESAMPLE(BUCKET 1 OUT OF 4 ON rand());
2 Doe
①從Hive 0.6.0開始,對之前的版本,必須把mapred.reduce .tasks設爲表中要填 充的桶的個數。若是桶是排序的,還須要把hive.enforce.sorting設爲true。
②顯式原始文件時,由於分隔字符是一個不能打印的控制字符,所以字段都擠在一塊兒。
三、舉個完整的小栗子:
(1)建student & student1 表:
create table student(id INT, age INT, name STRING)
partitioned by(stat_date STRING)
clustered by(id) sorted by(age) into 2 buckets
row format delimited fields terminated by ',';
create table student1(id INT, age INT, name STRING)
partitioned by(stat_date STRING)
clustered by(id) sorted by(age) into 2 buckets
row format delimited fields terminated by ',';
(2)設置環境變量:
set hive.enforce.bucketing = true;
(3)插入數據:
cat bucket.txt
1,20,zxm
2,21,ljz
3,19,cds
4,18,mac
5,22,android
6,23,symbian
7,25,wp
LOAD DATA local INPATH '/home/lijun/bucket.txt' OVERWRITE INTO TABLE student partition(stat_date="20120802");
from student
insert overwrite table student1 partition(stat_date="20120802")
select id,age,name where stat_date="20120802" sort by age;
(4)查看文件目錄:
hadoop fs -ls /hive/warehouse/test.db/student1/stat_date=20120802
Found 2 items
-rw-r--r-- 2 lijun supergroup 31 2013-11-24 19:16 /hive/warehouse/test.db/student1/stat_date=20120802/000000_0
-rw-r--r-- 2 lijun supergroup 39 2013-11-24 19:16 /hive/warehouse/test.db/student1/stat_date=20120802/000001_0
(5)查看sampling數據:
hive> select * from student1 tablesample(bucket 1 out of 2 on id);
Total MapReduce jobs = 1
Launching Job 1 out of 1
.......
OK
4 18 mac 20120802
2 21 ljz 20120802
6 23 symbian 20120802
Time taken: 20.608 seconds
注:tablesample是抽樣語句,語法:TABLESAMPLE(BUCKET x OUT OF y)
y必須是table總bucket數的倍數或者因子。hive根據y的大小,決定抽樣的比例。例如,table總共分了64份,當y=32時,抽取(64/32=)2個bucket的數據,當y=128時,抽取(64/128=)1/2個bucket的數據。x表示從哪一個bucket開始抽取。例如,table總bucket數爲32,tablesample(bucket 3 out of 16),表示總共抽取(32/16=)2個bucket的數據,分別爲第3個bucket和第(3+16=)19個bucket的數據。
四、Refer:
http://rdc.taobao.org/?p=1457 從MR到Hive – 一次遷移的過程
http://blog.573114.com/Blog/Html/A031/516857.html Hadoop權威指南 第12章 Hive簡介 P384
http://superlxw1234.iteye.com/blog/1545150 hive--Sort Merge Bucket Map Join
http://blog.csdn.net/yfkiss/article/details/7816916