Hive基礎

基礎

創建在Hadoop上的數據倉庫(Hive的表就是HDFS的目錄、數據就是HDFS的文件),定義了相似SQL的查詢語言,經過它來讀寫和管理分佈式存儲的數據。它的底層執行引擎能夠是MapReduce、Spark等(將SQL語句轉化成M/R或者Spark語言)。前端

優勢:簡單SQL,經常使用於數據分析,擴展性好(計算和存儲),統一的元數據管理(它建立的表或其餘組件,如Spark建立的表都是通用的)node

缺點:沒法進行迭代式計算、數據挖掘、延遲高(不用索引、且利用MR,處理小數據沒優點)、調優困難(粒度粗)mysql

概念web

  • 數據倉庫:一個面相主題的(用於推薦、可視化等)、(業務數據、文檔資料等通過ETL後)集成的、不可更新的、隨時間不變化的數據集合,用於支持企業或組織的決策分析處理。有不一樣的服務器,給前端提供不一樣的功能,如數據查詢、報表、分析等。正則表達式

  • 發展階段:簡單報表(平常工做中,彙總數據)、數據集市(根據某業務部門需求進行採集整理)、數據倉庫(按照數據模型,對整個企業的數據進行採集和整理)sql

  • 星型模型和雪花模型shell

    在冗餘能夠接受的前提下,實際運用中星型模型使用更多數據庫

    前者面相主題,非正規化的結構,多維數據集的每個維度都直接與事實表相鏈接,不存在漸變維度,因此數據有必定的冗餘,但效率高。如以商品爲主題,其餘信息包括訂單、客戶、廠家、物流等。冗餘:如客戶地址信息中,相同省份會出現重複的省份名稱。apache

    後者在前者繼承再生成星型,變成有一個或多個維表沒有直接鏈接到事實表上,而是經過其餘維錶鏈接到事實表上。如客戶的其餘信息有家庭信息、地址、教育等信息。經過最大限度地減小數據存儲量以及聯合較小的
    維表來改善查詢性能,去除了數據冗餘。

  • OLTP事務和OLAP分析:操做型處理,針對具體業務在數據庫聯機的平常操做,一般對少數記錄進行查詢、修改,用戶關心的是它的正常運做;分析型處理:針對某些主題的歷史數據進行分析,支持管理決策。

操做型處理 分析型處理
細節的 綜合的或提煉的
實體——關係(E-R)模型 星型模型或雪花模型
存取瞬間數據 存儲歷史數據,不包含最近的數據
可更新的 只讀、只追加
一次操做一個單元 一次操做一個集合
性能要求高,響應時間短 性能要求寬鬆
面向事務 面向分析
一次操做數據量小 一次操做數據量大
支持平常操做 支持決策需求
數據量小 數據量大
客戶訂單、庫存水平和銀行帳戶查詢等 客戶收益分析、市場細分等
  • 倉庫架構

    四層:ODS(臨時存儲層)、PDW(數據倉庫層,清洗事後的,通常遵循第三範式)、DM(數據集市層,面相主題,星形或雪花結構的數據,輕度彙總級、不存在明細數據、覆蓋全部業務)、APP(應用層)

  • 元數據

    技術元數據和業務元數據。

    存儲方式:每個數據集有對應的元數據文件和元數據庫。前者有較強獨立性,但在大規模處理中有大量元數據文件。後者推薦。

架構
訪問:命令行shell, jdbc, web gui
Driver:解釋器(語法)、編譯器(編譯、執行計劃)、優化器(優化計劃)結合元數據,完成HQL查詢語句計劃的生成。生成的查詢計劃存儲在HDFS中,並在隨後由計算引擎來對HDFS進行查詢。
Metastore:元數據(表信息、列信息、分區、數據所在目錄等)存儲在數據庫(mysql、derby、oracle)中。

Hive在集羣當中只充當client,在一臺機器部署就能夠了。

使用

-e:不進入 hive 的交互窗口執行 sql 語句,bin/hive -e "select id from student;"

-f:執行腳本中 sql 語句

查看hdfs:dfs -ls /;

查看歷史:cat .hivehistory

屬性配置

查看配置:set 配置屬性名稱

默認配置文件:hive-default.xml

用戶自定義配置文件:hive-site.xml。用戶自定義配置會覆蓋默認配置

# default倉庫原始位置。
<property>
    <name>hive.metastore.warehouse.dir</name>                   
    <value>/Users/flyang/Documents/self-teaching/Output/Hive</value>
    <description>location of default database for the warehouse</description>
</property>

# 查詢顯示列名
<property> 
    <name>hive.cli.print.header</name> 
    <value>true</value>
</property>

# 顯示當前數據庫
<property> 
    <name>hive.cli.print.current.db</name> 
    <value>true</value>
</property>

hive-log4j.properties:修改log的存放位置hive.log.dir=/opt/module/hive/logs

其餘配置方式

啓動配置hive -hiveconf mapred.reduce.tasks=10;

交互配置set mapred.reduce.tasks=100;(有些log4j配置不適用)

數據類型

Hive 數據類型 Java 數據類型 長度
TINYINT byte 1byte 有符號整數
SMALINT short 2byte 有符號整數
INT int 4byte 有符號整數
BIGINT long 8byte 有符號整數
BOOLEAN boolean 布爾類型,true 或者 false
FLOAT float 單精度浮點數
DOUBLE double 雙精度浮點數
STRING string 字符系列。能夠指定 字符集。可使用單 引號或者雙引號。
TIMESTAMP 時間類型
BINARY 字節數組

對於 Hive 的 String 類型至關於數據庫的 varchar 類型,該類型是一個可變的字符串,不過不能聲明大小上限。

Hive 有三種複雜數據類型 ARRAY、MAP 和 STRUCT。

{
    "name": "songsong", 
    "friends": ["bingbing" , "lili"] , //列表 Array
    "children": {  //鍵值 Map,
        "xiao song": 18 ,
        "xiaoxiao song": 19 
    }
    "address": {  //結構 Struct,
        "street": "hui long guan" , 
        "city": "beijing"
    }
}
# 例子
create table test(
name string,
friends array<string>,
children map<string, int>,
address struct<street:string, city:string>
)
row format delimited fields terminated by ',' 
collection items terminated by '_'
map keys terminated by ':'
lines terminated by '\n';

songsong,bingbing_lili,xiao song:18_xiaoxiao song:19,hui long guan_beijing yangyang,caicai_susu,xiao yang:18_xiaoxiao yang:19,chao yang_beijing

load data local inpath '/opt/module/datas/test.txt' into table test; # 不加local就從HDFS加載

# 訪問三種集合列裏的數據,如下分別是 ARRAY,MAP,STRUCT 的訪問方式
select friends[1],children['xiao song'],address.city from test where name="songsong";

數據轉換:

  • 隱式向更大一級轉換,向小的要 CAST('1' AS INT),失敗得null
  • 全部整數類型、FLOAT 和 STRING 類型均可以隱式地轉換成 DOUBLE
  • TINYINT、SMALLINT、INT 均可以轉換爲 FLOAT。
  • BOOLEAN 類型不能夠轉換爲任何其它的類型。

DDL數據定義

數據庫

默認路徑在配置文件中設置

# 建立
create database if not exists db_hive;
# 指定在hdfs上的位置
create database db_hive2 location '/db_hive2.db';
# 修改數據庫屬性
alter database db_hive set dbproperties('createtime'='20170830');
# 查看數據庫,extended是顯示詳細
desc database extended db_hive;
# 顯示數據庫
show databases like 'db_hive*';
# 刪除,cascade是當數據庫不爲空時用
drop database if exists db_hive2 cascade;

建立相關

CREATE [EXTERNAL] TABLE IF NOT EXISTS db_name.table_name
    [(col_name data_type [COMMENT col_comment], ...)]
    [COMMENT table_comment]
    [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] # 一個分區表對應一個HDFS文件夾
    [CLUSTERED BY (col_name, col_name, ...) # 建立分桶表
    [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] # 不經常使用
    [ROW FORMAT row_format] 
    [STORED AS file_format]
    [LOCATION hdfs_path]

# row_format
DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]

# file_format
SEQUENCEFILE(壓縮用,二進制序列文件)、TEXTFILE(文本)、 RCFILE(列式存儲格式文件)

# 補充
# 根據查詢結果建表
create table if not exists student3 as select id, name from student;
# 根據已存在的表結構建立表
create table if not exists student4 like student;
# 查看錶結構,比庫多個formatted
desc formatted student2;
# 刪除表
drop table dept_partition;
# 刪除數據
truncate table student;

外部表和內部表

在建表的同時指定一個指向實際數據的路徑(LOCATION),Hive 建立內部表(MANAGED_TABLE)時,會將數據移動到數據倉庫指向的路徑;若建立外部表,僅記錄數據所在的路徑,不對數據的位置作任何改變。在刪除表的時候,內部表的元數據和數據會被一塊兒刪除,而外部表只刪除元數據,不刪除數據。

應用:天天將收集到的網站日誌按期流入 HDFS 文本文件。在外部表(原始日誌表)的基礎 上作大量的統計分析,用到的中間表、結果表使用內部表存儲,數據經過 SELECT+INSERT 進入內部表。

分區表

# 例子
create table dept_partition(
deptno int, dname string, loc string
)
partitioned by (month string) # 二級分區表(month string, day string)
row format delimited fields terminated by '\t';

10  ACCOUNTING  NEW YORK    201707
10  ACCOUNTING  NEW YORK    201708
20  RESEARCH    DALLAS  201707
20  RESEARCH    DALLAS  201708

# 建立表時,partitioned by (month string)
load data local inpath '/opt/module/datas/dept.txt' into table default.dept_partition partition(month='201709'); # 給"month=201709"這個分區表導入。二級分區表(month='201709', day='13')

# 查詢分區表
select * from dept_partition where month='201707' # where month='201709' and day='13'二級分區表
union [ALL | DISTINCT] 
select * from dept_partition where month='201708';

# 建立分區,不加逗號
alter table dept_partition add partition(month='201706') partition(month='201705');

# 刪除,加逗號
alter table dept_partition drop partition (month='201705'), partition (month='201706');

# 查看有多少個分區
show partitions dept_partition;

# 直接上傳到分區目錄上後創建聯繫,三種方法
# 上傳到相應的目錄dept_partition2/month=201709/day=12;
# (1)修復
msck repair table dept_partition2;
# (2)添加分區
alter table dept_partition2 add partition(month='201709', day='11');
# (3)load數據
load data local inpath '/opt/module/datas/dept.txt' into table dept_partition2 partition(month='201709',day='10');

修改表

# 改表名
alter table dept_partition2 rename to dept_partition3;

# 改列名
ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]

# 增長和替換列,增長在全部列後面(partition 列前)
ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)

分桶

在一個文件夾下多個文件。分區是多個文件夾下各一個文件(沒有設置bucket的話)

create table stu_buck(id int, name string) 
clustered by(id) into 4 buckets
row format delimited fields terminated by '\t';

set hive.enforce.bucketing=true;
set mapreduce.job.reduces=-1;

insert into table stu_buck
select id, name from stu cluster by(id);

# 抽樣,4是table 總 bucket 數的倍數或者因子。
# table 總共分了 4 份,當 y=2 時,抽取(4/2=)2 個 bucket 的數據,當 y=8 時,抽取(4/8=)1/2 個 bucket 的數據。x 的值必須小於等於 y 的值。4 out of 4表示總共抽取(4/4=)1 個 bucket 的數據,抽取第 4 個 bucket 的數據
select * from stu_buck tablesample(bucket x out of y on id);

select * from stu tablesample(0.1 percent) ;不必定適用於全部的文件格式。另外,這種抽樣的最小抽樣單元是一個 HDFS 數據塊,小於返回所有。

DML數據操做

# 加載數據
load data [local] inpath '/opt/module/datas/student.txt' [overwrite] into table student [partition (partcol1=val1,...)];

# 插入數據
insert into table student partition(month='201709') values('1004','wangwu');

insert overwrite table student partition(month='201708') select id, name from student where month='201709';

from student
insert overwrite table student partition(month='201707')
select id, name where month='201709'
insert overwrite table student partition(month='201706') select id, name where month='201709';

# 查詢語句中建立表並加載數據
create table if not exists student3 as select id, name from student;

# 建立表時經過 Location 指定加載數據路徑

# Import 數據到指定 Hive 表中。先用 export 導出後,再將數據導入
import table student2 partition(month='201709') from 'path'

# 導出
# Insert導出,將查詢的結果導出到本地,格式化部分可選,沒有local就到hdfs
insert overwrite local directory '/opt/module/datas/export/student' 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY '\n' 
select * from student;
# Export 導出到 HDFS 上
export table default.student to '/user/hive/warehouse/export/student';

查詢

比較運算符(有別於MySQL部分)

操做符 支持的數據類型 描述
A<=>B 基本數據類型 若是 A 和 B 都爲 NULL,則返回 TRUE,其餘 的和等號(=)操做符的結果一致,若是任一 爲 NULL 則結果爲 NULL
A RLIKE B, A REGEXP B STRING 類型 B 是一個正則表達式,若是 A 與其匹配,則返回 TRUE;反之返回 FALSE。匹配使用的是 JDK中的正則表達式接口實現的,由於正則也依據其中的規則。例如,正則表達式必須和整個字符串 A 相匹配,而不是隻需與其字符串匹配

% 表明零個或多個字符(任意個字符)。

_ 表明一個字符。

Join

只支持等值鏈接,不支持非等值鏈接。

每對JOIN對象啓動一個MR任務

join中不支持or

笛卡爾積:

(1)省略鏈接條件:select empno, deptno from emp, dept;
(2)鏈接條件無效
(3)全部表中的全部行互相鏈接

排序

Order By:全局排序,一個 MapReduce。可設置set mapreduce.job.reduces=3;,每一個MR內部排序

Distribute By:相似 MR 中 partition,進行分區,結合 sort by 使用。DISTRIBUTE BY 語句要寫在 SORT BY 語句以前。對於 distribute by 進行測試,必定要分配多 reduce 進行處理(set mapreduce.job.reduces=3;),不然沒法看到 distribute by 的效果。

Cluster By:當 distribute by 和 sorts by 字段相同時,可使用 cluster by 方式。只能是倒序排序,不能指定排序規則爲 ASC 或者 DESC。

select * from emp order by sal desc;
# 下面兩個等價
select * from emp cluster by deptno;
select * from emp distribute by deptno sort by deptno;

函數

# 查看系統自帶的函數 
hive> show functions;
# 顯示自帶的函數的用法 
hive> desc function upper;
# 詳細顯示自帶的函數的用法
hive> desc function extended upper;

自定義函數:

(1)繼承 org.apache.hadoop.hive.ql.UDF

(2)須要實現 evaluate 函數;evaluate 函數支持重載;

UDF 必需要有返回類型,能夠返回 null,可是返回類型不能爲 void;

(3)模塊打包,放到hive/lib。臨時添加可用add jar path,永久添加要修改hive-site.xml。..爲其餘包,好比要處理json數據,就要另加一個jar包。

<property>
    <name>hive.aux.jars.path</name>
    <value>file:///opt/module/hive/lib/app_logs_hive.jar,...</value>
</property>

(3)在 hive 的命令行窗口註冊函數

create function getdaybegin AS 'com.my.hive.DayBeginUDF';

(4)驗證函數

登陸mysql -> metastore數據庫 -> FUNCS表

(5)刪函數時,要處在相應的數據庫才刪。drop function getdaybegin;

壓縮和存儲

通常選用:snappy壓縮,orc 或 parquet存儲

壓縮

map端

1)開啓 hive 中間傳輸數據壓縮功能
set hive.exec.compress.intermediate=true;
2)開啓 mapreduce 中 map 輸出壓縮功能
set mapreduce.map.output.compress=true;
3)設置 mapreduce 中 map 輸出數據的壓縮方式
set mapreduce.map.output.compress.codec= ...
reduce端

set hive.exec.compress.output=true;

set mapreduce.output.fileoutputformat.compress=true;

set mapreduce.output.fileoutputformat.compress.codec=...

終數據輸出壓縮爲塊壓縮: set mapreduce.output.fileoutputformat.compress.type=BLOCK;

存儲

Hive 支持的存儲數的格式主要有:

行存儲:TEXTFILE(默認格式,數據不作壓縮,磁盤開銷大,數據解析開銷大。可結合 Gzip、Bzip2 使用,但不能並行查詢) 、SEQUENCEFILE

列存儲:ORC、PARQUET(二進制方式存儲,文件中包含元數據,故自解析。按block分,每個行組由一個 Mapper 任務處理)

壓縮比:ORC > Parquet > textFile

查詢速度:ORC > TextFile > Parquet

ORC

由 1 個或多個 stripe 組成,每一個 stripe250MB 大小,這個 Stripe至關於 RowGroup,由Index Data,Row Data 和 Stripe Footer組成。

Index Data: 默認是每隔 1W 行作一個索引。這裏作的索引應該只是記錄某行的各字段在 Row Data 中的 offset。

Row Data:存的是具體的數據,先取部分行,而後對這些行按列進行存儲。對每一個列進行了編碼,分紅多個 Stream 來存儲。

Stripe Footer:存的是各個 Stream 的類型,長度等信息。

每一個文件有一個 File Footer,這裏面存的是每一個 Stripe 的行數,每一個 Column 的數據類型

信息等;每一個文件的尾部是一個 PostScript,這裏面記錄了整個文件的壓縮類型以及 FileFooter 的長度信息等。在讀取文件時,會 seek 到文件尾部讀 PostScript,從裏面解析到 File Footer 長度,再讀 FileFooter,從裏面解析到各個 Stripe 信息,再讀各個 Stripe,即從後往前讀。

PARQUET:(參考原資料)

# 存儲要設置纔會是非壓縮,默認ZLIB壓縮,也支持SNAPPY
stored as orc tblproperties ("orc.compress"="NONE");

調優

  • Fetch 抓取:對某些狀況的查詢能夠沒必要使用 MapReduce 計算

    在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默認是 more,在全局查找、字段查找、limit 查找等都不走mapreduce。

  • 本地模式:當輸入文件個數較小時,在單臺機器上處理全部的任務

    hive.exec.mode.local.auto 的值爲 true

    hive.exec.mode.local.auto.inputbytes.max=128M默認

    hive.exec.mode.local.auto.input.files.max=4默認

  • 表優化

    新版的 hive 已經對小表 JOIN 大表和大表 JOIN 小表進行了優化。小表放在左邊和右邊已經沒有明顯區別

    # 讓小的維度表(1000 條如下的記錄條數)先進內 存
    set hive.auto.convert.join = true; # 默認true
    hive.mapjoin.smalltable.filesize 默認值是25mb
    
    # 過濾null值再join
    insert overwrite table jointable
    select n.* from (
      select * from nullidtable where id is not null 
      ) n left join ori o on n.id = o.id;
    
    # 空key轉換,注意case部分
    set mapreduce.job.reduces = 5;
    insert overwrite table jointable
    select n.* from nullidtable n full join ori o on
    case when n.id is null then concat('hive', rand()) else n.id end = o.id;
  • MapJoin

    不設置就是不一樣join,即在 Reduce 階段完成 join,容易數據傾斜。查看「表優化」第一條。執行過程是Task A的一個MR Local Task(客戶端本地)將小錶轉換成一個HashTable數據結構,寫入本地文件中,而後把文件加載到Distribute Cache。Task B是沒有reducer的任務,裏面每一個mapper都去找Distribute Cache join,而後輸出。

  • Group By

    map端聚合

    hive.map.aggr = true
    #  Map 端進行聚合操做的條目數目
    hive.groupby.mapaggr.checkinterval = 100000
    # 有數據傾斜的時候進行負載均衡
    hive.groupby.skewindata = true # 當選項設定爲 true,生成的查詢計劃會有兩個 MR Job。第一個map的輸出隨機分佈到reduce,而後聚合,輸出結果再分到下一個reduce。
  • Count去重

    # 數據大時不要寫下面的
    select count(distinct id) from bigtable;
    # 寫這個
    select count(id) from (select id from bigtable group by id) a;
  • 笛卡爾積

    儘可能避免笛卡爾積,join 的時候不加 on 條件,或者無效的 on 條件,Hive 只能使用1個 reducer 來完成笛卡爾積

  • 行列過濾

    列處理:在 SELECT 中,只拿須要的列,若是有,儘可能使用分區過濾,少用 SELECT *。

    行處理:在分區剪裁中,當使用外關聯時,若是將副表的過濾條件寫在 Where 後面, 那麼就會先全表關聯,以後再過濾

    # 先子查詢where去掉無謂的關聯
    select b.id from bigtable b
    join (select id from ori where id <= 10 ) o on b.id = o.id;
  • 動態分區調整

    Insert 數據時候,數據庫自動會根據分區字段的值,將數據插入到相應的分區中

    hive.exec.dynamic.partition=true
    # 動態分區的模式,默認 strict,表示必須指定至少一個分區爲靜態分區,nonstrict 模式表示容許全部的動態分區。
    hive.exec.dynamic.partition.mode=nonstrict
    hive.exec.max.dynamic.partitions=1000
    # 一年數據的話, day 字段有 365,最小大於365
    hive.exec.max.dynamic.partitions.pernode=100
    hive.exec.max.created.files=100000
    hive.error.on.empty.partition=false
    
    # 建立一個分區表,一個目標分區表
    insert overwrite table ori_partitioned_target partition (p_time)
    select ...., p_time from ori_partitioned;
  • 數據傾斜

    map數量的決定因素:input 的文件總個數,input 的文件大小,集羣設置的文件塊大小。

    • 合理設置Map數:當小文件多時減小map數,在 map 執行前合併小文件set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;。當map邏輯複雜時,增長map數,根據computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M ,調低maxSize。set mapreduce.input.fileinputformat.split.maxsize=100;
    • 合理設置 Reduce 數:hive.exec.reducers.bytes.per.reducer=256000000, hive.exec.reducers.max=1009和N=min(參數 2,總輸入數據量/參數 1)?。在mapred-default.xml 中修改默認值。
  • 併發執行

    set hive.exec.parallel=true;, set hive.exec.parallel.thread.number=16;默認8

  • 嚴格模式

    禁止 3 種類型的查詢

    • 對於分區表查詢,除非 where 語句中含有分區字段過濾條件來限制範圍,不然不容許執行,即不容許掃描全部分區。

    • 使用了 order by 語句的查詢,要求必須使用 limit 語句

    • 限制笛卡爾積的查詢

      關係型數據庫,JOIN 查詢的時候能夠不使用 ON 語句而是使用 where 語句,但Hive不行

  • JVM重用

    小文件的場景或 task 特別多的場景,執行時間都很短,每一個task的啓動都是要啓動JVM進程,消耗大。在Hadoop的mapred-site.xml 設置,一般在10~20,根據業務來定。缺點是數據傾斜,或者某個task運行得慢時,task池不釋放資源。

  • 推斷執行

    推斷出運行較慢的任務,爲它啓動一個備份任務,讓兩個任務同時處理同一份數據,並最終選用最早成功的。若是用戶只是由於輸入數據量很大而須要執行長時間,啓動這個功能浪費巨大。

    在mapred-site.xml中設置mapreduce.map.speculative,也有reduce。

  • 執行計劃:explain [extended]

  • 分桶、分區、壓縮

參考:

尚硅谷大數據技術之 Hive

相關文章
相關標籤/搜索