數據倉庫之Hive快速入門 - 離線&實時數倉架構

數據倉庫VS數據庫

數據倉庫的定義:java

  • 數據倉庫是將多個數據源的數據通過ETL(Extract(抽取)、Transform(轉換)、Load(加載))理以後,按照必定的主題集成起來提供決策支持和聯機分析應用的結構化數據環境

數據倉庫VS數據庫:python

  • 數據庫是面向事務的設計,數據倉庫是面向主題設計的
  • 數據庫通常存儲在線交易數據,數據倉庫存儲的通常是歷史數據
  • 數據庫設計是避免冗餘,採用三範式的規則來設計,數據倉庫在設計是有意引入冗餘,採用反範式的方式來設計

OLTP VS OLAP:算法

  • 聯機事務處理OLTP是傳統的關係型數據庫的主要應用,主要是基本的、平常的事務處理,例如銀行交易
  • 聯機分析處理OLAP是數據倉庫系統的主要應用,支持複雜的分析操做,側重決策支持,而且提供直觀易懂的查詢結果

常規的數倉架構:
數據倉庫之Hive快速入門 - 離線&實時數倉架構sql

爲何建設數據倉庫:數據庫

  • 各個業務數據存在不一致,數據關係混亂
  • 業務系統通常針對於OLTP,而數據倉庫能夠實現OLAP分析
  • 數據倉庫是多源的複雜環境,能夠對多個業務的數據進行統一分析

數據倉庫建設目標:apache

  • 集成多源數據,數據來源和去向可追溯,梳理血緣關係
  • 減小重複開發,保存通用型中間數據,避免重複計算
  • 屏蔽底層業務邏輯,對外提供一致的、 結構清晰的數據

如何實現:編程

  • 實現通用型數據ETL工具
  • 根據業務創建合理的數據分層模型

數據倉庫分層建設

數倉建設背景:json

  • 數據建設剛起步,大部分數據通過粗暴的數據接入後直接對接業務
  • 數據建設發展到必定階段,發現數據的使用雜亂無章,各類業務都是從原始數據直接計算而得。
  • 各類重複計算,嚴重浪費了計算資源,須要優化性能

爲何進行數倉分層:bash

  • 清晰數據結構:每一個數據分層都有對應的做用域
  • 數據血緣追蹤:對各層之間的數據錶轉換進行跟蹤,創建血緣關係
  • 減小重複開發:規範數據分層,開發通用的中間層數據
  • 屏蔽原始數據的異常:經過數據分層管控數據質量
  • 屏蔽業務的影響:沒必要改一次業務就須要從新接入數據
  • 複雜問題簡單化:將複雜的數倉架構分解成多個數據層來完成

常見的分層含義:
數據倉庫之Hive快速入門 - 離線&實時數倉架構服務器

STG層

  • 原始數據層:存儲原始數據,數據結構與採集數據一致
  • 存儲週期:保存所有數據
  • 表命名規範:stg_主題_表內容_分表規則

ODS層

  • 數據操做層:對STG層數據進行初步處理,如去除髒數據,去除無用字段.
  • 存儲週期:默認保留近30天數據
  • 表命名規範:ods_主題_表內容_分表規則

DWD層

  • 數據明細層:數據處理後的寬表,目標爲知足80%的業務需求
  • 存儲週期:保留歷史至今全部的數據
  • 表命名規範:dwd_業務描述時間粒度

DWS層

  • 數據彙總層:彙總數據,解決數據彙總計算和數據完整度問題
  • 存儲週期:保留歷史至今全部的數據
  • 表命名規範:dws_業務描述_時間粒度_sum

DIM層

  • 公共維度層:存儲公共的信息數據,用於DWD、DWS的數據關聯
  • 存儲週期:按需存儲,通常保留歷史至今全部的數據
  • 表命名規範:dim_維度描述

DM層

  • 數據集市層:用於BI、多維分析、標籤、數據挖掘等
  • 存儲週期:按需存儲,--般保留歷史至今全部的數據
  • 表命名規範:dm_主題_表內容_分表規則

分層之間的數據流轉:
數據倉庫之Hive快速入門 - 離線&實時數倉架構


Hive是什麼

Hive簡介:

  • Hive是基於Hadoop的數據倉庫工具,提供類SQL語法(HiveQL)
  • 默認以MR做爲計算引擎(也支持其餘計算引擎,例如tez)、HDFS 做爲存儲系統,提供超大數據集的計算/擴展能力
  • Hive是將數據映射成數據庫和一張張的表,庫和表的元數據信息通常存在關係型數據庫

Hive的簡單架構圖:
數據倉庫之Hive快速入門 - 離線&實時數倉架構

Hive VS Hadoop:

  • Hive數據存儲:Hive的數據是存儲在HDFS.上的,Hive的庫和表是對HDFS.上數據的映射
  • Hive元數據存儲:元數據存儲通常在外部關係庫( Mysql )與Presto Impala等共享
  • Hive語句的執行過程:將HQL轉換爲MapReduce任務運行

Hive與關係數據庫Mysql的區別

產品定位

Hive是數據倉庫,爲海量數據的離線分析設計的,不支持OLTP(聯機事務處理所需的關鍵功能ACID,而更接近於OLAP(聯機分析技術)),適給離線處理大數據集。而MySQL是關係型數據庫,是爲實時業務設計的。

可擴展性

Hive中的數據存儲在HDFS(Hadoop的分佈式文件系統),metastore元數據一 般存儲在獨立的關係型數據庫中,而MySQL則是服務器本地的文件系統。所以Hive具備良好的可擴展性,數據庫因爲ACID語義的嚴格限制,擴展性十分有限。

讀寫模式

Hive爲讀時模式,數據的驗證則是在查詢時進行的,這有利於大數據集的導入,讀時模式使數據的加載很是迅速,數據的加載僅是文件複製或移動。MySQL爲寫時模式,數據在寫入數據庫時對照模式檢查。寫時模式有利於提高查詢性能,由於數據庫能夠對列進行索引。

數據更新

Hive是針對數據倉庫應用設計的,而數倉的內容是讀多寫少的,Hive中不支持對數據進行改寫,全部數據都是在加載的時候肯定好的。而數據庫中的數據一般是須要常常進行修改的。

索引

Hive支持索引,可是Hive的索引與關係型數據庫中的索引並不相同,好比,Hive不支持主鍵或者外鍵。Hive提供了有限的索引功能,能夠爲-些字段創建索引,一張表的索引數據存儲在另一張表中。因爲數據的訪問延遲較高,Hive不適合在線數據查詢。數據庫在少星的特定條件的數據訪問中,索引能夠提供較低的延遲。

計算模型

Hive默認使用的模型是MapReduce(也能夠on spark、on tez),而MySQL使用的是本身設計的Executor計算模型

數據倉庫之Hive快速入門 - 離線&實時數倉架構


Hive安裝部署

參考:


Hive基本使用(上)Hive數據類型/分區/基礎語法

Hive數據類型:

  • 基本數據類型:int、 float、 double、 string、 boolean、 bigint等
  • 複雜類型:array、map、 struct

Hive分區:

  • Hive將海量數據按某幾個字段進行分區,查詢時沒必要加載所有數據
  • 分區對應到HDFS就是HDFS的目錄.
  • 分區分爲靜態分區和動態分區兩種

Hive經常使用基礎語法:

  • USE DATABASE_NAME
  • CREATE DATABASE IF NOT EXISTS DB NAME
  • DESC DATABASE DB NAME
  • CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
  • SELECT * FROM TABLE NAME
  • ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME

寫個Python腳本生成一些測試數據:

import json
import random
import uuid

name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')
hobby = ('reading', 'play', 'dancing', 'sing')
subject = ('math', 'chinese', 'english', 'computer')

data = []
for item in name:
    scores = {key: random.randint(60, 100) for key in subject}
    data.append("|".join([uuid.uuid4().hex, item, ','.join(
        random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))

with open('test.csv', 'w') as f:
    f.write('\n'.join(data))

執行該腳本,生成測試數據文件:

[root@hadoop01 ~/py-script]# python3 gen_data.py
[root@hadoop01 ~/py-script]# ll -h
...
-rw-r--r--. 1 root root  745 11月  9 11:09 test.csv
[root@hadoop01 ~/py-script]#

咱們能夠看一下生成的數據:

[root@hadoop01 ~/py-script]# cat test.csv 
f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77

...
  • 數據以 | 符進行分割,前兩個字段都是string類型,第三個字段是array類型,第四個字段是map類型

建立測試用的數據庫:

0: jdbc:hive2://localhost:10000> create database hive_test;
No rows affected (0.051 seconds)
0: jdbc:hive2://localhost:10000> use hive_test;
No rows affected (0.06 seconds)
0: jdbc:hive2://localhost:10000>

建立測試表:

CREATE TABLE test(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將本地數據加載到Hive中:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;
No rows affected (0.785 seconds)
0: jdbc:hive2://localhost:10000>

查詢數據:
數據倉庫之Hive快速入門 - 離線&實時數倉架構

Hive將HQL轉換爲MapReduce的流程

瞭解了Hive中的SQL基本操做以後,咱們來看看Hive是如何將SQL轉換爲MapReduce任務的,整個轉換過程分爲六個階段:

  1. Antr定義SQL的語法規則,完成SQL詞法,語法解析,將SQL 轉化爲抽象語法樹AST Tree
  2. 遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock
  3. 遍歷QueryBlock,翻譯爲執行操做樹OperatorTree
  4. 邏輯層優化器進行OperatorTree變換,合併沒必要要的ReduceSinkOperator,減小shufle數據量
  5. 遍歷OperatorTree,翻譯爲MapReduce任務
  6. 物理層優化器進行MapReduce任務的變換,生成最終的執行計劃

數據倉庫之Hive快速入門 - 離線&實時數倉架構

與普通SQL同樣,咱們能夠經過在HQL前面加上explain關鍵字查看HQL的執行計劃:

explain select * from test where id > 10 limit 1000

Hive會將這條語句解析成一個個的Operator,Operator就是Hive解析以後的最小單元,每一個Operator其實都是對應一個MapReduce任務。例如,上面這條語句被Hive解析後,就是由以下Operator組成:
數據倉庫之Hive快速入門 - 離線&實時數倉架構

同時,Hive實現了優化器對這些Operator的順序進行優化,幫助咱們提高查詢效率。Hive中的優化器主要分爲三類:

  • RBO(Rule-Based Optimizer):基於規則的優化器
  • CBO(Cost-Based Optimizer):基於代價的優化器,這是默認的優化器
  • 動態CBO:在執行計劃生成的過程當中動態優化的方式

Hive基本使用(中)內部表/外部表/分區表/分桶表

內部表:

和傳統數據庫的Table概念相似,對應HDFS上存儲目錄,刪除表時,刪除元數據和表數據。內部表的數據,會存放在HDFS中的特定的位置中,能夠經過配置文件指定。當刪除表時,數據文件也會一併刪除。適用於臨時建立的中間表。

外部表:

指向已經存在的HDFS數據,刪除時只刪除元數據信息。適用於想要在Hive以外使用表的數據的狀況,當你刪除External Table時,只是刪除了表的元數據,它的數據並無被刪除。適用於數據多部門共享。建表時使用create external table。指定external關鍵字便可。

分區表:

Partition對應普通數據庫對Partition列的密集索引,將數據按照Partition列存儲到不一樣目錄,便於並行分析,減小數據量。分區表建立表的時候須要指定分區字段。

分區字段與普通字段的區別:分區字段會在HDFS表目錄下生成一個分區字段名稱的目錄,而普通字段則不會,查詢的時候能夠當成普通字段來使用,通常不直接和業務直接相關。

分桶表:

對數據進行hash,放到不一樣文件存儲,方便抽樣和join查詢。能夠將內部表,外部表和分區表進一步組織成桶表,能夠將表的列經過Hash算法進一步分解成不一樣的文件存儲。

對於內部表和外部表的概念和應用場景咱們很容易理解,咱們須要重點關注一下分區表和分桶表。 咱們爲何要創建分區表和分桶表呢?HQL經過where子句來限制條件提取數據,那麼與其遍歷一張大表,不如將這張大表拆分紅多個小表,並經過合適的索引來掃描表中的一小部分,分區和分桶都是採用了這種理念。

分區會建立物理目錄,而且能夠具備子目錄(一般會按照時間、地區分區),目錄名以 分區名=值 形式命名,例如:create_time=202011。分區名會做爲表中的僞列,這樣經過where字句中加入分區的限制能夠在僅掃描對應子目錄下的數據。經過 partitioned by (feld1 type, ...) 建立分區列。

分桶能夠繼續在分區的基礎上再劃分小表,分桶根據哈希值來肯定數據的分佈(即MapReducer中的分區),好比分區下的一部分數據能夠根據分桶再分爲多個桶,這樣在查詢時先計算對應列的哈希值並計算桶號,只須要掃描對應桶中的數據便可。分桶經過clustered by(field) into n buckets建立。

接下來簡單演示下這幾種表的操做,首先將上一小節生成的測試數據文件上傳到hdfs中:

[root@hadoop01 ~]# hdfs dfs -mkdir /test
[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test
[root@hadoop01 ~]# hdfs dfs -ls /test
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /test/test.csv
[root@hadoop01 ~]#

內部表

建表SQL:

CREATE TABLE test_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將hdfs數據加載到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;
No rows affected (0.169 seconds)
0: jdbc:hive2://localhost:10000>

查看建立的表存儲在hdfs的哪一個目錄下:

0: jdbc:hive2://localhost:10000> show create table test_table;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_table`(                         |
|   `user_id` string,                                |
|   `user_name` string,                              |
|   `hobby` array<string>,                           |
|   `scores` map<string,int>)                        |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'collection.delim'=',',                          |
|   'field.delim'='|',                               |
|   'line.delim'='\n',                               |
|   'mapkey.delim'=':',                              |
|   'serialization.format'='|')                      |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.mapred.TextInputFormat'       |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION                                           |
|   'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'transient_lastDdlTime'='1604893190')            |
+----------------------------------------------------+
22 rows selected (0.115 seconds)
0: jdbc:hive2://localhost:10000>

在hdfs中能夠查看到數據文件:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv
[root@hadoop01 ~]#

刪除表:

0: jdbc:hive2://localhost:10000> drop table test_table;
No rows affected (0.107 seconds)
0: jdbc:hive2://localhost:10000>

查看hdfs會發現該表所對應的存儲目錄也一併被刪除了:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table
drwxr-xr-x   - root supergroup          0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test
[root@hadoop01 ~]#

外部表

建表SQL,與內部表的區別就在於external關鍵字:

CREATE external TABLE external_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將數據文件加載到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;
No rows affected (0.182 seconds)
0: jdbc:hive2://localhost:10000>

此時會發現hdfs中的數據文件會被移動到hive的目錄下:

[root@hadoop01 ~]# hdfs dfs -ls /test
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#

刪除表:

0: jdbc:hive2://localhost:10000> drop table external_table;
No rows affected (0.112 seconds)
0: jdbc:hive2://localhost:10000>

查看hdfs會發現該表所對應的存儲目錄仍然存在:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#

分區表

建表語句:

CREATE TABLE partition_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
PARTITIONED BY (create_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將數據文件加載到Hive中,並指定分區:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');
No rows affected (0.747 seconds)
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');
No rows affected (0.347 seconds)
0: jdbc:hive2://localhost:10000>

執行以下sql,能夠從不一樣的分區統計結果:

0: jdbc:hive2://localhost:10000> select count(*) from partition_table;
+------+
| _c0  |
+------+
| 16   |
+------+
1 row selected (15.881 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (14.639 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (15.555 seconds)
0: jdbc:hive2://localhost:10000>

分區表在hdfs中的存儲結構:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
drwxr-xr-x   - root supergroup          0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv
[root@hadoop01 ~]#

分桶表

建表語句:

CREATE TABLE bucket_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
clustered by (user_name) sorted by (user_name) into 2 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

test表中的數據插入到bucket_table中:

0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;
No rows affected (17.393 seconds)
0: jdbc:hive2://localhost:10000>

抽樣查詢:
數據倉庫之Hive快速入門 - 離線&實時數倉架構

分桶表在hdfs的存儲目錄以下:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table
Found 2 items
-rw-r--r--   1 root supergroup        465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0
-rw-r--r--   1 root supergroup        281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0
[root@hadoop01 ~]#

Hive基本使用(下)內置函數/自定義函數/實現UDF

Hive常見內置函數:

  • 字符串類型:concat、substr、 upper、 lower
  • 時間類型:year、month、 day
  • 複雜類型:size、 get_json_object

查詢引擎都自帶了一部分函數來幫助咱們解決查詢過程中一些複雜的數據計算或者數據轉換操做,可是有時候自帶的函數功能不能知足業務的須要。這時候就須要咱們本身開發自定義的函數來輔助完成了,這就是所謂的用戶自定義函數UDF(User-Defined Functions)。Hive支持三類自定義函數:

  • UDF:普通的用戶自定義函數。用來處理輸入一行,輸出一行的操做,相似Map操做。如轉換字符串大小寫,獲取字符串長度等
  • UDAF:用戶自定義聚合函數(User-defined aggregate function),用來處理輸入多行,輸出一行的操做,相似Reduce操做。好比MAX、COUNT函數。
  • UDTF:用戶自定義表產生函數(User defined table-generating function),用來處理輸入一行,輸出多行(即一個表)的操做, 不是特別經常使用

UDF函數其實就是一段遵循必定接口規範的程序。在執行過程當中Hive將SQL轉換爲MapReduce程序,在執行過程中在執行咱們的UDF函數。

本小節簡單演示下自定義UDF函數,首先建立一個空的Maven項目,而後添加hive-exec依賴,版本與你安裝的Hive版本需對應上。完整的pom文件內容以下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hive-udf-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

首先建立一個繼承UDF的類,咱們實現的這個自定義函數功能就是簡單的獲取字段的長度:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class StrLen extends UDF {

    public int evaluate(final Text col) {
        return col.getLength();
    }
}

以上這種自定義函數只能支持處理普通類型的數據,若是要對複雜類型的數據作處理則須要繼承GenericUDF,並實現其抽象方法。例如,咱們實現一個對測試數據中的scores字段求平均值的函數:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.text.DecimalFormat;

public class AvgScore extends GenericUDF {

    /**
     * 函數的名稱
     */
    private static final String FUNC_NAME = "AVG_SCORE";

    /**
     * 函數所做用的字段類型,這裏是map類型
     */
    private transient MapObjectInspector mapOi;

    /**
     * 控制精度只返回兩位小數
     */
    DecimalFormat df = new DecimalFormat("#.##");

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 在此方法中能夠作一些前置的校驗,例如檢測函數參數個數、檢測函數參數類型
        mapOi = (MapObjectInspector) objectInspectors[0];
        // 指定函數的輸出類型
        return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        // 函數的核心邏輯,取出map中的value進行求平均值,並返回一個Double類型的結果值
        Object o = deferredObjects[0].get();
        double v = mapOi.getMap(o).values().stream()
                .mapToDouble(a -> Double.parseDouble(a.toString()))
                .average()
                .orElse(0.0);

        return Double.parseDouble(df.format(v));
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "func(map)";
    }
}

對項目進行打包,並上傳到服務器中:

[root@hadoop01 ~/jars]# ls
hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#

將jar包上傳到hdfs中:

[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs
[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs
[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs
Found 1 items
-rw-r--r--   1 root supergroup       4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#

在Hive中添加該jar包:

0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;
No rows affected (0.022 seconds)
0: jdbc:hive2://localhost:10000>

而後註冊臨時函數,臨時函數只會在當前的session中生效:

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000>

使用自定義函數處理:

0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;
+------------+---------+------------+
| user_name  | length  | avg_score  |
+------------+---------+------------+
| Tom        | 3       | 80.25      |
| Jerry      | 5       | 77.5       |
| Jim        | 3       | 83.75      |
| Angela     | 6       | 84.5       |
| Ann        | 3       | 90.0       |
| Bella      | 5       | 69.25      |
| Bonnie     | 6       | 76.5       |
| Caroline   | 8       | 84.5       |
+------------+---------+------------+
8 rows selected (0.083 seconds)
0: jdbc:hive2://localhost:10000>

刪除已註冊的臨時函數:

0: jdbc:hive2://localhost:10000> drop temporary function strlen;
No rows affected (0.01 seconds)
0: jdbc:hive2://localhost:10000> drop temporary function avg_score;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000>

臨時函數只會在當前的session中生效,若是須要註冊成永久函數則只須要把TEMPORARY關鍵字給去掉便可。以下所示:

0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.049 seconds)
0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>

刪除永久函數也是把TEMPORARY關鍵字給去掉便可。以下所示:

0: jdbc:hive2://localhost:10000> drop function strlen;
No rows affected (0.031 seconds)
0: jdbc:hive2://localhost:10000> drop function avg_score;
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>

Hive存儲結構 - OrcFile

Hive支持的存儲格式:
數據倉庫之Hive快速入門 - 離線&實時數倉架構

  • TextFile是默認的存儲格式,經過簡單的分隔符能夠對csv等類型的文件進行解析。但實際應用中一般都是使用OrcFile格式,由於ORCFile是列式存儲格式,更加適合大數據查詢的場景。

咱們都知道關係型數據庫基本是使用行式存儲做爲存儲格式,而大數據領域更多的是採用列式存儲,由於大數據分析場景中一般須要讀取大量行,可是隻須要少數的幾個列。這也是爲何一般使用OrcFile做爲Hive的存儲格式的緣由。因而可知,大數據的絕大部分應用場景都是OLAP場景。

OLAP場景的特色

讀多於寫

不一樣於事務處理(OLTP)的場景,好比電商場景中加購物車、下單、支付等須要在原地進行大量insert、update、delete操做,數據分析(OLAP)場景一般是將數據批量導入後,進行任意維度的靈活探索、BI工具洞察、報表製做等。

數據一次性寫入後,分析師須要嘗試從各個角度對數據作挖掘、分析,直到發現其中的商業價值、業務變化趨勢等信息。這是一個須要反覆試錯、不斷調整、持續優化的過程,其中數據的讀取次數遠多於寫入次數。這就要求底層數據庫爲這個特色作專門設計,而不是盲目採用傳統數據庫的技術架構。

大寬表,讀大量行可是少許列,結果集較小

在OLAP場景中,一般存在一張或是幾張多列的大寬表,列數高達數百甚至數千列。對數據分析處理時,選擇其中的少數幾列做爲維度列、其餘少數幾列做爲指標列,而後對全表或某一個較大範圍內的數據作聚合計算。這個過程會掃描大量的行數據,可是隻用到了其中的少數列。而聚合計算的結果集相比於動輒數十億的原始數據,也明顯小得多。

數據批量寫入,且數據不更新或少更新

OLTP類業務對於延時(Latency)要求更高,要避免讓客戶等待形成業務損失;而OLAP類業務,因爲數據量很是大,一般更加關注寫入吞吐(Throughput),要求海量數據可以儘快導入完成。一旦導入完成,歷史數據每每做爲存檔,不會再作更新、刪除操做。

無需事務,數據一致性要求低

OLAP類業務對於事務需求較少,一般是導入歷史日誌數據,或搭配一款事務型數據庫並實時從事務型數據庫中進行數據同步。多數OLAP系統都支持最終一致性。

靈活多變,不適合預先建模

分析場景下,隨着業務變化要及時調整分析維度、挖掘方法,以儘快發現數據價值、更新業務指標。而數據倉庫中一般存儲着海量的歷史數據,調整代價十分高昂。預先建模技術雖然能夠在特定場景中加速計算,可是沒法知足業務靈活多變的發展需求,維護成本太高。

行式存儲和列式存儲

行式存儲和列式存儲的對比圖:
數據倉庫之Hive快速入門 - 離線&實時數倉架構

與行式存儲將每一行的數據連續存儲不一樣,列式存儲將每一列的數據連續存儲。相比於行式存儲,列式存儲在分析場景下有着許多優良的特性:

  1. 如前所述,分析場景中每每須要讀大量行可是少數幾個列。在行存模式下,數據按行連續存儲,全部列的數據都存儲在一個block中,不參與計算的列在IO時也要所有讀出,讀取操做被嚴重放大。而列存模式下,只須要讀取參與計算的列便可,極大的減低了IO cost,加速了查詢。
  2. 同一列中的數據屬於同一類型,壓縮效果顯著。列存每每有着高達十倍甚至更高的壓縮比,節省了大量的存儲空間,下降了存儲成本。
  3. 更高的壓縮比意味着更小的data size,從磁盤中讀取相應數據耗時更短。
  4. 自由的壓縮算法選擇。不一樣列的數據具備不一樣的數據類型,適用的壓縮算法也就不盡相同。能夠針對不一樣列類型,選擇最合適的壓縮算法。
  5. 高壓縮比,意味着同等大小的內存可以存放更多數據,系統cache效果更好。

OrcFile

OrcFile存儲格式:
數據倉庫之Hive快速入門 - 離線&實時數倉架構

Orc列式存儲優勢:

  • 查詢時只須要讀取查詢所涉及的列,下降IO消耗,同時保存每一列統計信息,實現部分謂詞下推
  • 每列數據類型一致,可針對不一樣的數據類型採用其高效的壓縮算法
  • 列式存儲格式假設數據不會發生改變,支持分片、流式讀取,更好的適應分佈式文件存儲的特性

除了Orc外,Parquet也是經常使用的列式存儲格式。Orc VS Parquet:

  • OrcFile和Parquet都是Apache的頂級項目
  • Parquet不支持ACID、不支持更新,Orc支持有限的ACID和更新
  • Parquet的壓縮能力較高,Orc的查詢效率較高

離線數倉VS實時數倉

數據倉庫之Hive快速入門 - 離線&實時數倉架構

離線數倉:

  • 離線數據倉庫主要基於Hive等技術來構建T+1的離線數據
  • 經過定時任務天天拉取增量數據導入到Hive表中
  • 建立各個業務相關的主題維度數據,對外提供T+1的數據查詢接口

離線數倉架構:

  • 數據源經過離線的方式導入到離線數倉中
  • 數據分層架構:ODS、DWD、 DM
  • 下游應用根據業務需求選擇直接讀取DM

實時數倉:

  • 實時數倉基於數據採集工具,將原始數據寫入到Kafka等數據通道
  • 數據最終寫入到相似於HBase這樣支持快速讀寫的存儲系統
  • 對外提供分鐘級別、甚至秒級別的查詢方案

實時數倉架構:

  • 業務實時性要求的不斷提升,實時處理從次要部分變成了主要部分
  • Lambda架構:在離線大數據架構基礎上加了一個加速層,使用流處理技術完成實時性較高的指標計算
  • Kappa架構:以實時事件處理爲核心,統一數據處理

圖解Lambda架構數據流程

Lambda 架構(Lambda Architecture)是由 Twitter 工程師南森·馬茨(Nathan Marz)提出的大數據處理架構。這一架構的提出基於馬茨在 BackType 和 Twitter 上的分佈式數據處理系統的經驗。

Lambda 架構使開發人員可以構建大規模分佈式數據處理系統。它具備很好的靈活性和可擴展性,也對硬件故障和人爲失誤有很好的容錯性。

Lambda 架構總共由三層系統組成:批處理層(Batch Layer),速度處理層(Speed Layer),以及用於響應查詢的服務層(Serving Layer)。
數據倉庫之Hive快速入門 - 離線&實時數倉架構

在 Lambda 架構中,每層都有本身所肩負的任務。批處理層存儲管理主數據集(不可變的數據集)和預先批處理計算好的視圖。批處理層使用可處理大量數據的分佈式處理系統預先計算結果。它經過處理全部的已有歷史數據來實現數據的準確性。這意味着它是基於完整的數據集來從新計算的,可以修復任何錯誤,而後更新現有的數據視圖。輸出一般存儲在只讀數據庫中,更新則徹底取代現有的預先計算好的視圖。

速度處理層會實時處理新來的數據。速度層經過提供最新數據的實時視圖來最小化延遲。速度層所生成的數據視圖可能不如批處理層最終生成的視圖那樣準確或完整,但它們幾乎在收到數據後當即可用。而當一樣的數據在批處理層處理完成後,在速度層的數據就能夠被替代掉了。

本質上,速度層彌補了批處理層所致使的數據視圖滯後。好比說,批處理層的每一個任務都須要 1 個小時才能完成,而在這 1 個小時裏,咱們是沒法獲取批處理層中最新任務給出的數據視圖的。而速度層由於可以實時處理數據給出結果,就彌補了這 1 個小時的滯後。

全部在批處理層和速度層處理完的結果都輸出存儲在服務層中,服務層經過返回預先計算的數據視圖或從速度層處理構建好數據視圖來響應查詢。

全部的新用戶行爲數據均可以同時流入批處理層和速度層。批處理層會永久保存數據而且對數據進行預處理,獲得咱們想要的用戶行爲模型並寫入服務層。而速度層也同時對新用戶行爲數據進行處理,獲得實時的用戶行爲模型。

而當「應該對用戶投放什麼樣的廣告」做爲一個查詢(Query)來到時,咱們從服務層既查詢服務層中保存好的批處理輸出模型,也對速度層中處理的實時行爲進行查詢,這樣咱們就能夠獲得一個完整的用戶行爲歷史了。

一個查詢就以下圖所示,既經過批處理層兼顧了數據的完整性,也能夠經過速度層彌補批處理層的高延時性,讓整個查詢具備實時性。
數據倉庫之Hive快速入門 - 離線&實時數倉架構


Kappa 架構 VS Lambda

Lambda 架構的不足

雖然 Lambda 架構使用起來十分靈活,而且能夠適用於不少的應用場景,但在實際應用的時候,Lambda 架構也存在着一些不足,主要表如今它的維護很複雜。

使用 Lambda 架構時,架構師須要維護兩個複雜的分佈式系統,而且保證他們邏輯上產生相同的結果輸出到服務層中。舉個例子吧,咱們在部署 Lambda 架構的時候,能夠部署 Apache Hadoop 到批處理層上,同時部署 Apache Flink 到速度層上。

咱們都知道,在分佈式框架中進行編程實際上是十分複雜的,尤爲是咱們還會針對不一樣的框架進行專門的優化。因此幾乎每個架構師都認同,Lambda 架構在實戰中維護起來具備必定的複雜性。

那要怎麼解決這個問題呢?咱們先來思考一下,形成這個架構維護起來如此複雜的根本緣由是什麼呢?

維護 Lambda 架構的複雜性在於咱們要同時維護兩套系統架構:批處理層和速度層。咱們已經說過了,在架構中加入批處理層是由於從批處理層獲得的結果具備高準確性,而加入速度層是由於它在處理大規模數據時具備低延時性。

那咱們能不能改進其中某一層的架構,讓它具備另一層架構的特性呢?例如,改進批處理層的系統讓它具備更低的延時性,又或者是改進速度層的系統,讓它產生的數據視圖更具準確性和更加接近歷史數據呢?

另一種在大規模數據處理中經常使用的架構——Kappa 架構(Kappa Architecture),即是在這樣的思考下誕生的。

Kappa 架構

Kappa 架構是由 LinkedIn 的前首席工程師傑伊·克雷普斯(Jay Kreps)提出的一種架構思想。克雷普斯是幾個著名開源項目(包括 Apache Kafka 和 Apache Samza 這樣的流處理系統)的做者之一,也是如今 Confluent 大數據公司的 CEO。

克雷普斯提出了一個改進 Lambda 架構的觀點:

咱們能不能改進 Lambda 架構中速度層的系統性能,使得它也能夠處理好數據的完整性和準確性問題呢?咱們能不能改進 Lambda 架構中的速度層,使它既可以進行實時數據處理,同時也有能力在業務邏輯更新的狀況下從新處理之前處理過的歷史數據呢?

他根據自身多年的架構經驗發現,咱們是能夠作到這樣的改進的。咱們知道像 Apache Kafka 這樣的流處理平臺是具備永久保存數據日誌的功能的。經過Kafka的這一特性,咱們能夠從新處理部署於速度層架構中的歷史數據。

下面我就以 Kafka 爲例來介紹整個全新架構的過程。

第一步,部署 Kafka,並設置數據日誌的保留期(Retention Period)。

這裏的保留期指的是你但願可以從新處理的歷史數據的時間區間。例如,若是你但願從新處理最多一年的歷史數據,那就能夠把 Apache Kafka 中的保留期設置爲 365 天。若是你但願可以處理全部的歷史數據,那就能夠把 Apache Kafka 中的保留期設置爲「永久(Forever)」。

第二步,若是咱們須要改進現有的邏輯算法,那就表示咱們須要對歷史數據進行從新處理。咱們須要作的就是從新啓動一個 Kafka 做業實例(Instance)。這個做業實例將重頭開始,從新計算保留好的歷史數據,並將結果輸出到一個新的數據視圖中。

咱們知道 Kafka 的底層是使用 Log Offset 來判斷如今已經處理到哪一個數據塊了,因此只須要將 Log Offset 設置爲 0,新的做業實例就會重頭開始處理歷史數據。

第三步,當這個新的數據視圖處理過的數據進度遇上了舊的數據視圖時,咱們的應用即可以切換到重新的數據視圖中讀取。

第四步,中止舊版本的做業實例,並刪除舊的數據視圖。

這個架構就如同下圖所示。
數據倉庫之Hive快速入門 - 離線&實時數倉架構

與 Lambda 架構不一樣的是,Kappa 架構去掉了批處理層這一體系結構,而只保留了速度層。你只須要在業務邏輯改變又或者是代碼更改的時候進行數據的從新處理。Kappa 架構統一了數據的處理方式,再也不維護離線和實時兩套代碼邏輯。

Kappa 架構的不足

Kappa 架構也是有着它自身的不足的。由於 Kappa 架構只保留了速度層而缺乏批處理層,在速度層上處理大規模數據可能會有數據更新出錯的狀況發生,這就須要咱們花費更多的時間在處理這些錯誤異常上面。若是需求發生變化或歷史數據須要從新處理都得經過上游重放來完成。而且從新處理歷史的吞吐能力會低於批處理。

還有一點,Kappa 架構的批處理和流處理都放在了速度層上,這致使了這種架構是使用同一套代碼來處理算法邏輯的。因此 Kappa 架構並不適用於批處理和流處理代碼邏輯不一致的場景。

Lambda VS Kappa

數據倉庫之Hive快速入門 - 離線&實時數倉架構


主流大公司的實時數倉架構

阿里菜鳥實時數倉

數據倉庫之Hive快速入門 - 離線&實時數倉架構
數據倉庫之Hive快速入門 - 離線&實時數倉架構

美團實時數倉

數據倉庫之Hive快速入門 - 離線&實時數倉架構

實時數倉建設特徵

  • 總體架構設計經過分層設計爲OLAP查詢分擔壓力
  • 複雜的計算統一在實時計算層作,避免給OLAP查詢帶來過大的壓力
  • 彙總計算經過OLAP數據查詢引擎進行
  • 整個架構中實時計算通常 是Spark+Flink配合
  • 消息隊列Kafka一家獨大,配合HBase、ES、 Mysq|進行數據落盤
  • OLAP領域Presto、Druid、 Clickhouse、 Greenplum等等層出不窮
相關文章
相關標籤/搜索