前言
算是對在滴滴實習的這段時間Hive的筆記吧,回學校也有段時間了,應該整理整理了,確定不會鉅細無遺,做爲一種學習記錄或者入門指南吧css
基礎
SQL基本語法
Python基礎語法(HiveStreaming會用到)
Java基礎語法(寫UDF會用到)
Hadoop基礎(畢竟mapred過程)
什麼是Hive?
hive是基於Hadoop的一個數據倉庫工具,能夠將結構化的數據文件映射爲一張數據庫表,並提供完整的sql查詢功能,能夠將sql語句轉換爲MapReduce任務進行運行。其優勢是學習成本低,能夠經過類SQL語句快速實現簡單的MapReduce統計,沒必要開發專門的MapReduce應用,十分適合數據倉庫的統計分析。html
建議閱讀:java
大數據時代的技術hive:hive介紹
寫的比較好的Hive介紹1
寫的比較好的Hive介紹2
Hive的基本操做
Hive建庫建表
第一步:建數據庫python
CREATE DATABASE IF NOT EXISTS test COMMENT '添加對錶的描述'
第二步:建table表android
1
2
3
4
5
6
7
8
9
10
11
12
13
# 把本地數據put到集羣
$hadoop fs -put /Users/mrlevo/Desktop/project/
163 music/music_data /test/music/
# 其中hdfs上的數據是這樣的,它會location到該路徑下的全部文件
$hadoop fs -ls /test/music/
-rw-r--r--
1 mac supergroup
2827582 2017 -
07 -
07 15 :
03 /test/music/music_data
# music_data裏面的文件是這樣的,這裏把我的信息抹掉了
$ hadoop fs -cat /test/music/music_data | more
xxx|
9 |讓音樂串起你我|雲南省|文山壯族苗族自治州|
75 後|新浪微博|
482 |
2002 |
326 |http:
xx|
8 |None|雲南省|曲靖市|
75 後|None|
0 |
12 |
4 |http:
xx|
8 |百年雲煙只過眼,不爲繁華易素心|貴州省|貴陽市|
85 後|None|
1 |
22 |
1 |http:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
hive>
create external table test
.163 music(
> nickname
string ,
> stage
string ,
> introduce
string ,
> province
string ,
> city
string ,
> age
string ,
> social
string ,
> trends
string ,
> follow
string ,
> fans
string ,
> homepage
string )
> ROW FORMAT DELIMITED FIELDS TERMINATED BY
'|'
> LOCATION
'/test/music/' ;
OK
Time taken:
0.035 seconds
hive>
select *
from test
.163 music limit
1 ;
OK
xxxxx
9 讓音樂串起你我 雲南省 文山壯族苗族自治州
75 後 新浪微博
482 2002 326 http:
Time taken:
0.065 seconds, Fetched:
1 row(s)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 查看錶結構. 操做表以前,都應該先操做查看錶字段類型,否則進行join的時候可能會產生很大的錯誤
hive> desc test
.163 music;
OK
nickname
string
stage
string
introduce
string
province
string
city
string
age
string
social
string
trends
string
follow
string
fans
string
homepage
string
Time taken:
0.063 seconds, Fetched:
11 row(s)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Hive的數據都是存儲在HDFS上的,默認有一個根目錄,在hive-site.xml中,由參數hive.metastore.warehouse.dir指定。默認值爲/user/hive/warehouse.
hive> show create table test
.163 music;
OK
CREATE EXTERNAL TABLE
`test.163music` (
`nickname` string ,
`stage` string ,
`introduce` string ,
`province` string ,
`city` string ,
`age` string ,
`social` string ,
`trends` string ,
`follow` string ,
`fans` string ,
`homepage` string )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY
'|'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://localhost/test/music'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE' =
'false' ,
'numFiles' =
'0' ,
'numRows' =
'-1' ,
'rawDataSize' =
'-1' ,
'totalSize' =
'0' ,
'transient_lastDdlTime' =
'1499411018' )
Time taken:
0.119 seconds, Fetched:
27 row(s)
注意:除了location 還有另外一種方法把數據掛到hive表裏面,就是put方法,建表的時候,和上面的惟一區別就是沒有location web
首先,查看下本身建的外部表的路徑在哪算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 這裏我用另外一張表,test2
hive> show
create table test.test2;
OK
createtab_stmt
CREATE EXTERNAL TABLE `test2`(
`name` string,
`age` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED
BY ','
STORED
AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs: xxxxx/warehouse/test.db/test2'
TBLPROPERTIES (
'transient_lastDdlTime' =
'1481853187' )
Time taken:
0.132 seconds, Fetched:
13 row(s)
而後將本身的內容put到表中sql
hadoop fs -put
LOCALFILE hdfs:xxxxxx/warehouse/DATABASE/TABLENAME
# LOCALFILE 須要推送的文件
# DATABASE 數據庫名
# TABLENAME 表名
幾個關鍵字
語法關鍵字太多了,我只寫了我之前作過筆記的那幾個,其他的應該查一下就能用,可參考的文章:Hive高級查詢(group by、 order by、 join等) shell
join
這應該是用到最多的語句了,多個表的聯結操做
SELECT a.id,a.dept,b.age # 選擇須要鏈接的東西 FROM a join b ON (a.id = b.id);
實際集羣栗子
首先查看下錶中內容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 這裏我測試了另兩張表
hive>
select * from owntest;
OK
owntest.name owntest.age
shangsan 20
lisi 22
zhouwu 21
Time taken: 0.042 seconds, Fetched: 3 row(s)
hive>
select * from owntest2;
OK
owntest2.workplace owntest2.name
didi shangsan
baidu lisi
wangyi zhouwu
Time taken: 0.349 seconds, Fetched: 3 row(s)
而後開闢隊列 (若是是公司的hive,確定會設置隊列的,本身部門有本身的隊列,有資源分配)
hive>
set mapred
.job .queue .name =xxxxxxxx
而後實現join的操做
hive> select *
from owntest join owntest2
on owntest.
name =owntest2.
name ;
回車以後會顯示進度狀況,以後查看錶便可,這裏我先建了張表來存放結果數據,名字是query_result,使用insert overwrite進行數據複寫到新建表中
hive>
insert overwrite table query_result > select * from owntest join owntest2 on owntest.name=owntest2.name;
Query ID = map_da_20161216130626_40fda9ef-386f-4ef3-9e13-7b08ad69118c
Total jobs = 3
Launching Job 1 out of 3
...
OK
owntest.name owntest.age owntest2.workplace owntest2.name # 這裏join出來爲空值,因此只返回了列名
Time taken: 96.932 seconds
後續操做
也能夠將結果寫入hdfs上的文件中
hive>
insert overwrite directory "/user/xx/xukai/result.txt" > row format delimited fields terminated by "," > select owntest.name,owntest.age,owntest2.workplace from owntest join owntest2 on owntest.name=owntest2.name;
###########
$ hadoop fs -cat /user/xx/xukai/result.txt/000000_0
lisi,22,baidu
shangsan,20,didi
zhouwu,21,wangyi
#值得注意的是,文件只是制定了輸出的路徑,若是沒有文件在,那麼會直接輸出000000_0的文件
固然還有最經常使用的方法,更適合工程項目,使用hive -e 「sql語句」 > 服務器本地輸出路徑
$ hive -e "
set mapred.job.queue.name=root.xxxxx;
use test;
select owntest.name,owntest.age,owntest2.workplace from owntest join owntest2 on owntest.name=owntest2.name " > /data/xx/xukai/result_hive_e4.txt # 上面的話意思是,直接在終端輸入,不用進入hive環境,將其結果輸出到/data/xx/xukai/result_hive_e4.txt路徑下
而後能夠直接下載到本地查看
若是是公司的集羣,確定是掛服務器上跑的,那若是我要線下也就是下載到本身的本機上處理,能夠經過簡單的python語句,便可開啓傳輸服務
# 在本地文件夾直接輸入便可
python -m SimpleHTTPServer
8042
# 在本地經過瀏覽器瀏覽http://線上服務器地址:8042
其餘相似操做,請看Hive join用法 ,這裏沒有存入新表的話,只是顯示結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
hive>
select * from zz0;
111111
222222
888888
hive>
select * from zz1;
111111
333333
444444
888888
hive>
select * from zz0 join zz1 on zz0.uid = zz1.uid;
111111 111111
888888 888888
hive>
select * from zz0 left outer join zz1 on zz0.uid = zz1.uid;
111111 111111
222222 NULL
888888 888888
注意:Hive中Join的關聯鍵必須在ON ()中指定,不能在Where中指定,不然就會先作笛卡爾積,再過濾。join發生在where字句前,因此,若是要限制join的輸出,須要寫在where字句,不然寫在join字句
補充笛卡爾積:假設集合A={a, b},集合B={0, 1, 2},則兩個集合的笛卡爾積爲{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。
若是出現一對多的狀況,假設left join,左表中對應多個右表的值,則相應的錶行會變多,而在右表中沒有對應則輸出爲Null
一個例子,左邊爲table1,右邊爲table2
select * from table1 left outer join table2 on (table1.student_no=table2.student_no);
1
2
3
4
5
6
7
8
9
10
11
12
1 name1
1 11
1 name1
1 12
1 name1
1 13
2 name2
2 11
2 name2
2 14
3 name3
3 15
3 name3
3 12
4 name4
4 13
4 name4
4 12
5 name5
5 14
5 name5
5 16
6 name6
NULL NULL
能夠看到left outer join左邊表的數據都列出來了,若是右邊表沒有對應的列,則寫成了NULL值。
同時注意到,若是左邊的主鍵在右邊找到了N條,那麼結果也是會叉乘獲得N條的,好比這裏主鍵爲1的顯示了右邊的3條。
更多案例:
case when
語法: case when a then b [when c then d]* [else e] end
若是a爲TRUE,則返回b;若是c爲TRUE,則返回d;不然返回e,不少時候在於對數據的改寫操做
hive>
select case when 1 =
2 then 'tom '
when 2 =
2 then 'mary '
else 'tim '
end from lxw_dual;
mary
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hive>
select * from owntest;
OK
owntest.name owntest.age
shangsan 20
lisi 22
zhouwu 21
hive>
create table test_case as select name, case when age=21 then 'twenty one ' when age=22 then 'twenty two' else 'twenty' end age_case from owntest;
hive>
select * from test_case;
OK
test_case.name test_case.age_case
shangsan twenty
lisi twenty two
zhouwu twenty one
sum
用於統計的啦,多和group by一塊使用進行分組的一些統計操做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
hive>
select * from owntest;
OK
owntest.name owntest.age
lisi 22
zhangsan 21
zhouwu 21
zhangsan 21
lisi 22
zhouwu 20
xiaoming 10
xiao 10
hive>
create table test_sum as select name,sum (age) from owntest group by name;
hive>
select * from test_sum;
OK
test_sum.name test_sum._c1
lisi 44.0
xiao 10.0
xiaoming 10.0
zhangsan 42.0
zhouwu 41.0
collect_set
簡單來講,相似python 的set,將一個屬性中的全部值放在一個array裏面而後返回去重後的結果,若是不須要去重,可使用collect_list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
hive>
select * from owntest;
OK
owntest.name owntest.age
lisi 22
zhangsan 21
zhouwu 21
zhangsan 21
lisi 22
zhouwu 20
xiaoming 10
xiao 10
hive>
create table test_collect as select name,collect_set(age) age_collect from owntest where age>20 group by name;
hive>
select * from test_collect;
OK
test_collect.name test_collect.age_collect
lisi ["22"]
zhangsan ["21"]
zhouwu ["21"]
# 注意返回了值,應爲限制了條件age>20
綜合一點的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
hive>
select * from owntest;
OK
owntest.name owntest.age owntest.money
lisi 22 1000
zhangsan 22 1000
zhouwu 21 2999
zhangsan 21 3999
lisi 22 2828
zhouwu 20 2992
xiaoming 10 2999
xiao 10 1919
hive>
create table test_sum as select name,collect_set(age) age_collect,sum (money) sum_money from owntest group by name;
hive>
select * from test_sum ;
OK
test_sum.name test_sum.age_collect test_sum.sum_money
lisi [22] 3828
xiao [10] 1919
xiaoming [10] 2999
zhangsan [22,21] 4999
zhouwu [21,20] 5991
concat_ws
contact_ws(seperator, string s1, string s2…)
功能:制定分隔符將多個字符串鏈接起來 例子:經常結合 group by 與 collect_set 使用
col1
col2
col3
a
b
1
a
b
2
a
b
3
c
d
4
c
d
5
c
d
6
變成以下
select col1, col2, concat_ws(',' ,collect_set(col3)) from xxxtable group by col1,col2;
須要注意的是 ,col3必須是string類型的,若是不是,則須要使用cast(col3 as string)強制轉化
order by & sort by & cluster by
select * from baidu_click order by click desc ;
使用distribute和sort進行分組排序
select * from baidu_click distribute by product_line sort by click desc ;
distribute by + sort by就是該替代方案,被distribute by設定的字段爲KEY,數據會被HASH分發到不一樣的reducer機器上,而後sort by會對同一個reducer機器上的每組數據進行局部排序 。
sort by,那麼在每一個reducer端都會作排序,也就是說保證了局部有序,好處是:執行了局部排序以後能夠爲接下去的全局排序提升很多的效率
distribute by:按照指定的字段將數據劃分到不一樣的輸出reduce中,能夠保證每一個Reduce處理的數據範圍不重疊,每一個分區內的數據是沒有排序的。控制着在map端如何分區,按照什麼字段進行分區,要注意均衡。參考Hive:解決Hive建立文件數過多的問題
因爲每組數據是按KEY進行HASH後的存儲而且組內有序,其還能夠有兩種用途:
1) 直接做爲HBASE的輸入源,導入到HBASE;
2) 在distribute+sort後再進行order by階段,實現間接的全局排序;
cluster by,可看作是distribute by和 sort by的組合,如下兩種等價,可是,cluster by指定的列只能是升序,不能指定asc和desc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 栗子
hive>
select * from (select stage ,count (*) as count_num from test.163 music group by stage)a sort by a.count_num desc ;
Query ID = root_20170707154144_04b4e0fa-b63f-4fb3-a836-8c1e48e59aed
Total jobs = 2
Launching Job 1 out of 2
...
OK
6 5211
7 5137
5 3923
8 2437
4 1855
3 1402
2 710
1 686
9 566
0 451
10 36
row_number() over()
From hive如何去掉重複數據,顯示第一條
語法:row_number() over(partition by column order by column)
row_number() over(partition by col1 order by col2)
表示根據col1分組,在分組內部根據 col2排序,而此函數計算的值就表示每組內部排序後的順序編號(組內連續的惟一的)
實例:
步驟一:初始化數據
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
create table employee (empid int ,deptid int ,salary decimal (10 ,2 )) insert into employee values (1 ,10 ,5500.00 ) insert into employee values (2 ,10 ,4500.00 ) insert into employee values (3 ,20 ,1900.00 ) insert into employee values (4 ,20 ,4800.00 ) insert into employee values (5 ,40 ,6500.00 ) insert into employee values (6 ,40 ,14500.00 ) insert into employee values (7 ,40 ,44500.00 ) insert into employee values (8 ,50 ,6500.00 ) insert into employee values (9 ,50 ,7500.00 ) 數據顯示爲 empid deptid salary 1 10 5500.00 2 10 4500.00 3 20 1900.00 4 20 4800.00 5 40 6500.00 6 40 14500.00 7 40 44500.00 8 50 6500.00 9 50 7500.00
需求:根據部門分組,顯示每一個部門的工資等級
預期結果:
empid deptid salary rank
1 10 5500.00 1
2 10 4500.00 2
4 20 4800.00 1
3 20 1900.00 2
7 40 44500.00 1
6 40 14500.00 2
5 40 6500.00 3
9 50 7500.00 1
8 50 6500.00 2
步驟二:實現方式
SELECT *, row_number() over(partition by deptid ORDER BY salary desc ) rank FROM employee
根據deptid進行分組,而後在組內對salary進行降序排序,對此序列,標記等級。
Hive 中級用法及概念
Hive分區表的使用
爲何有分區表?
若是把一年或者一個月的日誌文件存放在一個表下,那麼數據量會很是的大,當查詢這個表中某一天的日誌文件的時候,查詢速度還很是的慢,這時候能夠採用分區表的方式,把這個表根據時間點再劃分爲小表。這樣劃分後,查詢某一個時間點的日誌文件就會快不少,由於這是不須要進行全表掃描。
注意的是!!建表,新手必定要建外部表!!由於涉及到數據元的刪除問題,若是瞎操做,可能你drop table的時候把數據也清空了,特別是內表刪除,hdfs上的數據也會沒的!切記! 什麼是hive的內部表和外部表?
創建分區表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE EXTERNAL TABLE IF NOT EXISTS `test.zzzztp` ( `business_id` bigint, `order_id` bigint,) PARTITIONED BY ( year string ,month string ,day string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster-tj/xxxx/newesid/' ;
# 最後制定的location是會新創建一個文件夾,裏面會存放這個表所指向的數據,分區就在這個文件夾下開始進行
# 注意若是使用hive streaming來解決問題的時候,那種'`'不要出如今字符串中
插入新分區和數據
注意:建立分區後,裏面是沒有分區的,須要插入分區的操做,而插入分區的操做須要根據hdfs上的目錄進行對應的改變,否則插入的分區沒法獲取數據
alter table test.zzzztp add if not exists partition (year ='2016' , month ='08' , day ='31' ) location 'hdfs://mycluster-tj/xxxx/newesid/20160831' ;
注意:使用python自動化腳本插入方法會更快,注意寫好邏輯
例子:hive表運行結果插入結果表的對應分區
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#
insert data into middle table set mapred.job.queue.name=xxxxxx;
USE xxx_table;
ALTER TABLE tablexxx ADD IF NOT EXISTS PARTITION (YEAR = '2016' ,MONTH = '10' ,DAY = '15' ) LOCATION '/user/xxxx/2016/10/15' ;
insert overwrite table tablexxx partition(YEAR = '2016' ,MONTH = '10' ,DAY = '15' ) # 新添加新分區,而後插入數據到相應的分區 select distinct a.business_id, a.order_id, a.origin_id, b.category as category1 , a.destination_id, c.category as category2 from (select * from test.zzzztp where concat(year ,month ,day )='20161015' ) a join poi_data_base.poi_category b on a.origin_id = b.poi_id join poi_data_base.poi_category c on a.destination_id = c.poi_id ;
Hive桶的使用
本段引自: 大數據時代的技術hive:hive的數據類型和數據模型
partition是目錄級別的拆分數據,bucket則是對數據源數據文件自己來拆分數據。使用桶的表會將源數據文件按必定規律拆分紅多個文件,要使用bucket,咱們首先要打開hive對桶的控制,命令以下:
set hive
.enforce .bucketing = true
下面這段文字是我引用博客園裏風生水起的博文
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 示例:
# 建臨時表student_tmp,並導入數據:
hive> desc student_tmp;
OK
id int
age int
name string
stat_date string
Time taken: 0.106 seconds
hive>
select * from student_tmp;
OK
1 20 zxm 20120801
2 21 ljz 20120801
3 19 cds 20120801
4 18 mac 20120801
5 22 android 20120801
6 23 symbian 20120801
7 25 wp 20120801
Time taken: 0.123 seconds
# 建student表:
hive>
create table student(id INT , age INT , name STRING) >partitioned by (stat_date STRING) >clustered by (id) sorted by (age) into 2 bucket >row format delimited fields terminated by ',' ;
# 設置環境變量:
>
set hive.enforce.bucketing = true ;
# 插入數據:
>from student_tmp
>
insert overwrite table student partition(stat_date="20120802" ) >select id,age,name where stat_date="20120801" sort by age;
查看文件目錄
$ hadoop fs
-ls /user/hive/warehouse/studentstat_date
= 20120802 /
Found
2 items
-rw -r -- r
-- 1 work supergroup
31 2012 - 07 - 31 19 :
52 /user/hive/warehouse/student/stat_date
= 20120802 /
000000 _0
-rw -r -- r
-- 1 work supergroup
39 2012 - 07 - 31 19 :
52 /user/hive/warehouse/student/stat_date
= 20120802 /
000001 _0
物理上,每一個桶就是表(或分區)目錄裏的一個文件,桶文件是按指定字段值進行hash,而後除以桶的個數例如上面例子2,最後去結果餘數,由於整數的hash值就是整數自己,上面例子裏,字段hash後的值仍是字段自己,因此2的餘數只有兩個0和1,因此咱們看到產生文件的後綴是0_0和1_0,文件裏存儲對應計算出來的元數據。
Hive的桶,我我的認爲沒有特別的場景或者是特別的查詢,咱們能夠沒有必要使用,也就是不用開啓hive的桶的配置。由於桶運用的場景有限,一個是作map鏈接的運算,我在後面的文章裏會講到,一個就是取樣操做了,下面仍是引用風生水起博文裏的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
查看sampling數據:
hive>
select * from student tablesample(bucket 1 out of 2 on id);
Total MapReduce jobs = 1
Launching Job 1 out of 1
.......
OK
4 18 mac 20120802
2 21 ljz 20120802
6 23 symbian 20120802
Time taken: 20.608 seconds
# tablesample是抽樣語句,語法:TABLESAMPLE(BUCKET x OUT OF y)
y必須是table總bucket數的倍數或者因子。hive根據y的大小,決定抽樣的比例。例如,table總共分了64份,當y=32時,抽取 (64/32=)2個bucket的數據,當y=128時,抽取(64/128=)1/2個bucket的數據。x表示從哪一個bucket開始抽取。例 如,table總bucket數爲32,tablesample(bucket 3 out of 16),表示總共抽取(32/16=)2個bucket的數據,分別爲第3個bucket和第(3+16=)19個bucket的數據。
Hive 數據傾斜的問題
傾斜的緣由
可參考:hive大數據傾斜總結
使map的輸出數據更均勻的分佈到reduce中去,是咱們的最終目標。因爲Hash算法的侷限性,按key Hash會或多或少的形成數據傾斜。大量經驗代表數據傾斜的緣由是人爲的建表疏忽或業務邏輯能夠規避的。
解決方案
驅動表:使用大表作驅動表,以防止內存溢出;Join最右邊的表是驅動表;Mapjoin無視join順序,用大表作驅動表;StreamTable。也就是大表放join右邊,小表放左邊
設置自動處理傾斜隊列:set hive.groupby.skewindata = true;
生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每一個 Reduce 作部分聚合操做,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不一樣的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程能夠保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操做。
對於group by或distinct,設置此項
設置頂層的聚合操做(top-levelaggregation operation),是指在group by語句以前執行的聚合操做:SET hive.map.aggr=true
Hive UDF
當Hive提供的內置函數沒法知足你的業務處理須要時,此時就能夠考慮使用用戶自定義函數(UDF:user-defined function)。至於如何寫UDF,我沒試過,直接用的公司庫裏面寫好的,你能夠參考Hive內置運算函數,自定義函數(UDF)和Transform
1.add jar hdfs:/xxxx/aaa.jar;
2.
create temporary function aaa as 'xxxx' ;
3.
select aaa(oid) oid from database .table -- 就像函數同樣使用就能夠了
Hive Streaming
Hive提供了另外一種數據處理方式——Streaming,這樣就能夠不須要編寫Java代碼了,其實Streaming處理方式能夠支持不少語言。可是,Streaming的執行效率一般比對應編寫的UDF或改寫InputFormat對象的方式要低。管道中序列化而後反序列化數據一般時低效的。並且以一般的方式很難調試整個程序。
簡單說就是,使用腳原本處理數據
Hive中提供了多種語法來使用Streaming
map()
reduce()
transform()
可是,注意map()實際上並不是在Mapper階段執行Streaming,正如reduce()實際上並不是在reducer階段執行Streaming。所以,相同的功能,一般建議使用transform()語句,這樣能夠避免產生疑惑。
Hive Streaming實際操做
hive streamming 方式 將python程序分發到每一個reducer,對每一個reducer中的數據應用該程序。
使用自帶的cat,來試試,USING什麼腳本均可以
hive>
select transform(owntest.name,owntest.age)
>
using '/bin/cat' as name,age
>
from owntest;
Total MapReduce CPU Time Spent: 1 seconds 980 msec
OK
name age
shangsan 20
lisi 22
zhouwu 21
Time taken: 22.042 seconds, Fetched: 3 row(s)
使用Python腳本進行的Streaming操做
很好的文章值得實驗:hive組合python: Transform的使用
select transform(salary) using 'python /root/experiment/hive/sum.py' as total // 調用外部python程序 from employee;
使用using來加載須要執行的python腳本
import sys
for lines
in sys.stdin:
newline=lines.split(
',' )[
0 ]
newlinestr=
"\t" .join(newline)
print >> sys.stdout, newlinestr
注意!加載的python註釋裏面也不能出現中文字符!除非添加# -*- coding:utf-8 -*-
,py腳本的存儲位置,最好寫絕對路徑
1
2
3
4
5
6
7
8
9
10
11
12
13
# 栗子,將數據流以hive streaming的方式傳遞進去,解析的時候按照文本流的方式進行解析,善用python字符處理
hive -e "
set mapred.job.queue.name=xxxxxx; # 設置隊列
add file findnavitype.py; # 將程序加載到分佈式緩存中
drop table xx.xxxx;
create table xx.xxxx as select TRANSFORM(h.order_id,h.navitype_list,h.city_list) USING 'python findnavitype.py' as (order_id,navitype,cityname) from table h
hive streaming實際使用中的一些技巧
由於加載python文件而後輸出流實際上遵循的select xx from xxtabl的原則,而xx就是通過python處理獲得,因此複雜的運算直接拋給python便可,而後輸出的字段只要看as(這裏)就能夠了。
不要往python裏面拋無用的字段,若是計算量x 小還不影響,可是對於計算量很是大,影響效果很大,特別是字段是字符串形式的。
加載python文件,若使用hive -e 「sql」來使用,則須要在python文件存在的路徑下使用
執行Hive方式
在終端執行Hive
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
hive -e "
set mapred.job.queue.name = xxxxxxx;
use test;
select a.name, a.age, b.workplace from owntest a join owntest2 b on a.name = b.name and a.age > 20 -- where a.age > 20 -- can use where but only behand the join " > /data/xxxx/xukai/result_hive_e6.txt
注意點
兩種方式來挑選數據,where或者在join中直接過濾
在hive -e中的sql語句註釋只能爲--
,其他的如 #
和//
不能用
Python自動化實現Hive相關腳本
最核心的仍是能夠在終端實現hive -e 「sql」 語句,直接再封裝上一層python,而後python file.py 便可。而後調用保存爲.q
的寫好的hive操做語句,可使用替換符來實現更多的自定義操做,使用raw_input
來獲取輸入數據,捕捉後再當作變量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os
if __name__ ==
'__main__' :
day = raw_input(
"please enter the day you want to join(like 20161014)" )
m_table = raw_input(
"please enter the hive table store datasource(like database.table):" )
result_table = raw_input(
"please enter the hive table store result(like database.table):" )
ql_createtable = open(
"createtables.q" ,
"r" ).read()
ql_join = open(
"jointables.q" ,
"r" ).read()
ql_createtable_target = ql_createtable.replace(
"{DAY}" ,day).replace(
"{DATABASE_TABLENAME}" ,m_table)
ql_join_target = ql_join.replace(
"{source_table}" ,m_table).replace(
"{result_table}" ,result_table)
os.system(
'hive -e "%s"' % ql_createtable_target)
os.system(
'hive -e "%s"' % ql_join_target)
More
注意括號的中文和英文方式是不一樣的!!!!看起來倒是同樣的!!!,表別名的時候建議貼邊 )table1
,這樣寫就容易觀察是否是中文括號致使的問題。
count(*)
全部值不全爲NULL
時,加1操做,count(1)
無論有沒有值,只要有這條記錄,值就+1,count(col)
中col
列裏面的值爲NULL
,值不會+1,這個列裏面的值不爲NULL
,才+1,union
與union all
前者會把兩個記錄集中相同的記錄合併,然後者不會,性能上前者優
更新
2017.07.09 更新,整理的片斷信息
2017.07.26 更新,增長桶的片斷
2017.08.04 更新,增長UDF操做
2018.04.22 更新,優化結構補充內容,review學習
致謝