數據倉庫的定義:java
數據倉庫VS數據庫:python
OLTP VS OLAP:算法
常規的數倉架構:
sql
爲何建設數據倉庫:數據庫
數據倉庫建設目標:apache
如何實現:編程
數倉建設背景:json
爲何進行數倉分層:bash
常見的分層含義:
服務器
STG層
stg_主題_表內容_分表規則
ODS層
ods_主題_表內容_分表規則
DWD層
dwd_業務描述時間粒度
DWS層
dws_業務描述_時間粒度_sum
DIM層
dim_維度描述
DM層
dm_主題_表內容_分表規則
分層之間的數據流轉:
Hive簡介:
Hive的簡單架構圖:
Hive VS Hadoop:
產品定位
Hive是數據倉庫,爲海量數據的離線分析設計的,不支持OLTP(聯機事務處理所需的關鍵功能ACID,而更接近於OLAP(聯機分析技術)),適給離線處理大數據集。而MySQL是關係型數據庫,是爲實時業務設計的。
可擴展性
Hive中的數據存儲在HDFS(Hadoop的分佈式文件系統),metastore元數據一 般存儲在獨立的關係型數據庫中,而MySQL則是服務器本地的文件系統。所以Hive具備良好的可擴展性,數據庫因爲ACID語義的嚴格限制,擴展性十分有限。
讀寫模式
Hive爲讀時模式,數據的驗證則是在查詢時進行的,這有利於大數據集的導入,讀時模式使數據的加載很是迅速,數據的加載僅是文件複製或移動。MySQL爲寫時模式,數據在寫入數據庫時對照模式檢查。寫時模式有利於提高查詢性能,由於數據庫能夠對列進行索引。
數據更新
Hive是針對數據倉庫應用設計的,而數倉的內容是讀多寫少的,Hive中不支持對數據進行改寫,全部數據都是在加載的時候肯定好的。而數據庫中的數據一般是須要常常進行修改的。
索引
Hive支持索引,可是Hive的索引與關係型數據庫中的索引並不相同,好比,Hive不支持主鍵或者外鍵。Hive提供了有限的索引功能,能夠爲-些字段創建索引,一張表的索引數據存儲在另一張表中。因爲數據的訪問延遲較高,Hive不適合在線數據查詢。數據庫在少星的特定條件的數據訪問中,索引能夠提供較低的延遲。
計算模型
Hive默認使用的模型是MapReduce(也能夠on spark、on tez),而MySQL使用的是本身設計的Executor計算模型
參考:
Hive數據類型:
Hive分區:
Hive經常使用基礎語法:
USE DATABASE_NAME
CREATE DATABASE IF NOT EXISTS DB NAME
DESC DATABASE DB NAME
CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
SELECT * FROM TABLE NAME
ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME
寫個Python腳本生成一些測試數據:
import json import random import uuid name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline') hobby = ('reading', 'play', 'dancing', 'sing') subject = ('math', 'chinese', 'english', 'computer') data = [] for item in name: scores = {key: random.randint(60, 100) for key in subject} data.append("|".join([uuid.uuid4().hex, item, ','.join( random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])])) with open('test.csv', 'w') as f: f.write('\n'.join(data))
執行該腳本,生成測試數據文件:
[root@hadoop01 ~/py-script]# python3 gen_data.py [root@hadoop01 ~/py-script]# ll -h ... -rw-r--r--. 1 root root 745 11月 9 11:09 test.csv [root@hadoop01 ~/py-script]#
咱們能夠看一下生成的數據:
[root@hadoop01 ~/py-script]# cat test.csv f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77 ...
|
符進行分割,前兩個字段都是string
類型,第三個字段是array
類型,第四個字段是map
類型建立測試用的數據庫:
0: jdbc:hive2://localhost:10000> create database hive_test; No rows affected (0.051 seconds) 0: jdbc:hive2://localhost:10000> use hive_test; No rows affected (0.06 seconds) 0: jdbc:hive2://localhost:10000>
建立測試表:
CREATE TABLE test( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
將本地數據加載到Hive中:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test; No rows affected (0.785 seconds) 0: jdbc:hive2://localhost:10000>
查詢數據:
瞭解了Hive中的SQL基本操做以後,咱們來看看Hive是如何將SQL轉換爲MapReduce任務的,整個轉換過程分爲六個階段:
與普通SQL同樣,咱們能夠經過在HQL前面加上explain
關鍵字查看HQL的執行計劃:
explain select * from test where id > 10 limit 1000
Hive會將這條語句解析成一個個的Operator,Operator就是Hive解析以後的最小單元,每一個Operator其實都是對應一個MapReduce任務。例如,上面這條語句被Hive解析後,就是由以下Operator組成:
同時,Hive實現了優化器對這些Operator的順序進行優化,幫助咱們提高查詢效率。Hive中的優化器主要分爲三類:
內部表:
和傳統數據庫的Table概念相似,對應HDFS上存儲目錄,刪除表時,刪除元數據和表數據。內部表的數據,會存放在HDFS中的特定的位置中,能夠經過配置文件指定。當刪除表時,數據文件也會一併刪除。適用於臨時建立的中間表。
外部表:
指向已經存在的HDFS數據,刪除時只刪除元數據信息。適用於想要在Hive以外使用表的數據的狀況,當你刪除External Table時,只是刪除了表的元數據,它的數據並無被刪除。適用於數據多部門共享。建表時使用
create external table
。指定external
關鍵字便可。
分區表:
Partition對應普通數據庫對Partition列的密集索引,將數據按照Partition列存儲到不一樣目錄,便於並行分析,減小數據量。分區表建立表的時候須要指定分區字段。
分區字段與普通字段的區別:分區字段會在HDFS表目錄下生成一個分區字段名稱的目錄,而普通字段則不會,查詢的時候能夠當成普通字段來使用,通常不直接和業務直接相關。
分桶表:
對數據進行hash,放到不一樣文件存儲,方便抽樣和join查詢。能夠將內部表,外部表和分區表進一步組織成桶表,能夠將表的列經過Hash算法進一步分解成不一樣的文件存儲。
對於內部表和外部表的概念和應用場景咱們很容易理解,咱們須要重點關注一下分區表和分桶表。 咱們爲何要創建分區表和分桶表呢?HQL經過where
子句來限制條件提取數據,那麼與其遍歷一張大表,不如將這張大表拆分紅多個小表,並經過合適的索引來掃描表中的一小部分,分區和分桶都是採用了這種理念。
分區會建立物理目錄,而且能夠具備子目錄(一般會按照時間、地區分區),目錄名以 分區名=值
形式命名,例如:create_time=202011
。分區名會做爲表中的僞列,這樣經過where
字句中加入分區的限制能夠在僅掃描對應子目錄下的數據。經過 partitioned by (feld1 type, ...)
建立分區列。
分桶能夠繼續在分區的基礎上再劃分小表,分桶根據哈希值來肯定數據的分佈(即MapReducer中的分區),好比分區下的一部分數據能夠根據分桶再分爲多個桶,這樣在查詢時先計算對應列的哈希值並計算桶號,只須要掃描對應桶中的數據便可。分桶經過clustered by(field) into n buckets
建立。
接下來簡單演示下這幾種表的操做,首先將上一小節生成的測試數據文件上傳到hdfs中:
[root@hadoop01 ~]# hdfs dfs -mkdir /test [root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test [root@hadoop01 ~]# hdfs dfs -ls /test Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /test/test.csv [root@hadoop01 ~]#
建表SQL:
CREATE TABLE test_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
將hdfs數據加載到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table; No rows affected (0.169 seconds) 0: jdbc:hive2://localhost:10000>
查看建立的表存儲在hdfs的哪一個目錄下:
0: jdbc:hive2://localhost:10000> show create table test_table; +----------------------------------------------------+ | createtab_stmt | +----------------------------------------------------+ | CREATE TABLE `test_table`( | | `user_id` string, | | `user_name` string, | | `hobby` array<string>, | | `scores` map<string,int>) | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | | WITH SERDEPROPERTIES ( | | 'collection.delim'=',', | | 'field.delim'='|', | | 'line.delim'='\n', | | 'mapkey.delim'=':', | | 'serialization.format'='|') | | STORED AS INPUTFORMAT | | 'org.apache.hadoop.mapred.TextInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | | LOCATION | | 'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' | | TBLPROPERTIES ( | | 'bucketing_version'='2', | | 'transient_lastDdlTime'='1604893190') | +----------------------------------------------------+ 22 rows selected (0.115 seconds) 0: jdbc:hive2://localhost:10000>
在hdfs中能夠查看到數據文件:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv [root@hadoop01 ~]#
刪除表:
0: jdbc:hive2://localhost:10000> drop table test_table; No rows affected (0.107 seconds) 0: jdbc:hive2://localhost:10000>
查看hdfs會發現該表所對應的存儲目錄也一併被刪除了:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/ Found 2 items drwxr-xr-x - root supergroup 0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table drwxr-xr-x - root supergroup 0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test [root@hadoop01 ~]#
建表SQL,與內部表的區別就在於external
關鍵字:
CREATE external TABLE external_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
將數據文件加載到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table; No rows affected (0.182 seconds) 0: jdbc:hive2://localhost:10000>
此時會發現hdfs中的數據文件會被移動到hive的目錄下:
[root@hadoop01 ~]# hdfs dfs -ls /test [root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv [root@hadoop01 ~]#
刪除表:
0: jdbc:hive2://localhost:10000> drop table external_table; No rows affected (0.112 seconds) 0: jdbc:hive2://localhost:10000>
查看hdfs會發現該表所對應的存儲目錄仍然存在:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv [root@hadoop01 ~]#
建表語句:
CREATE TABLE partition_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) PARTITIONED BY (create_time string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
將數據文件加載到Hive中,並指定分區:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011'); No rows affected (0.747 seconds) 0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012'); No rows affected (0.347 seconds) 0: jdbc:hive2://localhost:10000>
執行以下sql,能夠從不一樣的分區統計結果:
0: jdbc:hive2://localhost:10000> select count(*) from partition_table; +------+ | _c0 | +------+ | 16 | +------+ 1 row selected (15.881 seconds) 0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011'; +------+ | _c0 | +------+ | 8 | +------+ 1 row selected (14.639 seconds) 0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012'; +------+ | _c0 | +------+ | 8 | +------+ 1 row selected (15.555 seconds) 0: jdbc:hive2://localhost:10000>
分區表在hdfs中的存儲結構:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table Found 2 items drwxr-xr-x - root supergroup 0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011 drwxr-xr-x - root supergroup 0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012 [root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011 Found 1 items -rw-r--r-- 1 root supergroup 745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv [root@hadoop01 ~]#
建表語句:
CREATE TABLE bucket_table( user_id string, user_name string, hobby array<string>, scores map<string,integer> ) clustered by (user_name) sorted by (user_name) into 2 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n';
將test
表中的數據插入到bucket_table
中:
0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test; No rows affected (17.393 seconds) 0: jdbc:hive2://localhost:10000>
抽樣查詢:
分桶表在hdfs的存儲目錄以下:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table Found 2 items -rw-r--r-- 1 root supergroup 465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0 -rw-r--r-- 1 root supergroup 281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0 [root@hadoop01 ~]#
Hive常見內置函數:
查詢引擎都自帶了一部分函數來幫助咱們解決查詢過程中一些複雜的數據計算或者數據轉換操做,可是有時候自帶的函數功能不能知足業務的須要。這時候就須要咱們本身開發自定義的函數來輔助完成了,這就是所謂的用戶自定義函數UDF(User-Defined Functions)。Hive支持三類自定義函數:
UDF函數其實就是一段遵循必定接口規範的程序。在執行過程當中Hive將SQL轉換爲MapReduce程序,在執行過程中在執行咱們的UDF函數。
本小節簡單演示下自定義UDF函數,首先建立一個空的Maven項目,而後添加hive-exec
依賴,版本與你安裝的Hive版本需對應上。完整的pom
文件內容以下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>hive-udf-test</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
首先建立一個繼承UDF
的類,咱們實現的這個自定義函數功能就是簡單的獲取字段的長度:
package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public class StrLen extends UDF { public int evaluate(final Text col) { return col.getLength(); } }
以上這種自定義函數只能支持處理普通類型的數據,若是要對複雜類型的數據作處理則須要繼承GenericUDF
,並實現其抽象方法。例如,咱們實現一個對測試數據中的scores
字段求平均值的函數:
package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.text.DecimalFormat; public class AvgScore extends GenericUDF { /** * 函數的名稱 */ private static final String FUNC_NAME = "AVG_SCORE"; /** * 函數所做用的字段類型,這裏是map類型 */ private transient MapObjectInspector mapOi; /** * 控制精度只返回兩位小數 */ DecimalFormat df = new DecimalFormat("#.##"); @Override public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { // 在此方法中能夠作一些前置的校驗,例如檢測函數參數個數、檢測函數參數類型 mapOi = (MapObjectInspector) objectInspectors[0]; // 指定函數的輸出類型 return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; } @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { // 函數的核心邏輯,取出map中的value進行求平均值,並返回一個Double類型的結果值 Object o = deferredObjects[0].get(); double v = mapOi.getMap(o).values().stream() .mapToDouble(a -> Double.parseDouble(a.toString())) .average() .orElse(0.0); return Double.parseDouble(df.format(v)); } @Override public String getDisplayString(String[] strings) { return "func(map)"; } }
對項目進行打包,並上傳到服務器中:
[root@hadoop01 ~/jars]# ls hive-udf-test-1.0-SNAPSHOT.jar [root@hadoop01 ~/jars]#
將jar包上傳到hdfs中:
[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs [root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs [root@hadoop01 ~/jars]# hdfs dfs -ls /udfs Found 1 items -rw-r--r-- 1 root supergroup 4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar [root@hadoop01 ~/jars]#
在Hive中添加該jar包:
0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar; No rows affected (0.022 seconds) 0: jdbc:hive2://localhost:10000>
而後註冊臨時函數,臨時函數只會在當前的session中生效:
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen"; No rows affected (0.026 seconds) 0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore"; No rows affected (0.008 seconds) 0: jdbc:hive2://localhost:10000>
使用自定義函數處理:
0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test; +------------+---------+------------+ | user_name | length | avg_score | +------------+---------+------------+ | Tom | 3 | 80.25 | | Jerry | 5 | 77.5 | | Jim | 3 | 83.75 | | Angela | 6 | 84.5 | | Ann | 3 | 90.0 | | Bella | 5 | 69.25 | | Bonnie | 6 | 76.5 | | Caroline | 8 | 84.5 | +------------+---------+------------+ 8 rows selected (0.083 seconds) 0: jdbc:hive2://localhost:10000>
刪除已註冊的臨時函數:
0: jdbc:hive2://localhost:10000> drop temporary function strlen; No rows affected (0.01 seconds) 0: jdbc:hive2://localhost:10000> drop temporary function avg_score; No rows affected (0.009 seconds) 0: jdbc:hive2://localhost:10000>
臨時函數只會在當前的session中生效,若是須要註冊成永久函數則只須要把TEMPORARY
關鍵字給去掉便可。以下所示:
0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar'; No rows affected (0.049 seconds) 0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar'; No rows affected (0.026 seconds) 0: jdbc:hive2://localhost:10000>
刪除永久函數也是把TEMPORARY
關鍵字給去掉便可。以下所示:
0: jdbc:hive2://localhost:10000> drop function strlen; No rows affected (0.031 seconds) 0: jdbc:hive2://localhost:10000> drop function avg_score; No rows affected (0.026 seconds) 0: jdbc:hive2://localhost:10000>
Hive支持的存儲格式:
咱們都知道關係型數據庫基本是使用行式存儲做爲存儲格式,而大數據領域更多的是採用列式存儲,由於大數據分析場景中一般須要讀取大量行,可是隻須要少數的幾個列。這也是爲何一般使用OrcFile做爲Hive的存儲格式的緣由。因而可知,大數據的絕大部分應用場景都是OLAP場景。
不一樣於事務處理(OLTP)的場景,好比電商場景中加購物車、下單、支付等須要在原地進行大量insert、update、delete操做,數據分析(OLAP)場景一般是將數據批量導入後,進行任意維度的靈活探索、BI工具洞察、報表製做等。
數據一次性寫入後,分析師須要嘗試從各個角度對數據作挖掘、分析,直到發現其中的商業價值、業務變化趨勢等信息。這是一個須要反覆試錯、不斷調整、持續優化的過程,其中數據的讀取次數遠多於寫入次數。這就要求底層數據庫爲這個特色作專門設計,而不是盲目採用傳統數據庫的技術架構。
在OLAP場景中,一般存在一張或是幾張多列的大寬表,列數高達數百甚至數千列。對數據分析處理時,選擇其中的少數幾列做爲維度列、其餘少數幾列做爲指標列,而後對全表或某一個較大範圍內的數據作聚合計算。這個過程會掃描大量的行數據,可是隻用到了其中的少數列。而聚合計算的結果集相比於動輒數十億的原始數據,也明顯小得多。
OLTP類業務對於延時(Latency)要求更高,要避免讓客戶等待形成業務損失;而OLAP類業務,因爲數據量很是大,一般更加關注寫入吞吐(Throughput),要求海量數據可以儘快導入完成。一旦導入完成,歷史數據每每做爲存檔,不會再作更新、刪除操做。
OLAP類業務對於事務需求較少,一般是導入歷史日誌數據,或搭配一款事務型數據庫並實時從事務型數據庫中進行數據同步。多數OLAP系統都支持最終一致性。
分析場景下,隨着業務變化要及時調整分析維度、挖掘方法,以儘快發現數據價值、更新業務指標。而數據倉庫中一般存儲着海量的歷史數據,調整代價十分高昂。預先建模技術雖然能夠在特定場景中加速計算,可是沒法知足業務靈活多變的發展需求,維護成本太高。
行式存儲和列式存儲的對比圖:
與行式存儲將每一行的數據連續存儲不一樣,列式存儲將每一列的數據連續存儲。相比於行式存儲,列式存儲在分析場景下有着許多優良的特性:
OrcFile存儲格式:
Orc列式存儲優勢:
除了Orc外,Parquet也是經常使用的列式存儲格式。Orc VS Parquet:
離線數倉:
離線數倉架構:
實時數倉:
實時數倉架構:
Lambda 架構(Lambda Architecture)是由 Twitter 工程師南森·馬茨(Nathan Marz)提出的大數據處理架構。這一架構的提出基於馬茨在 BackType 和 Twitter 上的分佈式數據處理系統的經驗。
Lambda 架構使開發人員可以構建大規模分佈式數據處理系統。它具備很好的靈活性和可擴展性,也對硬件故障和人爲失誤有很好的容錯性。
Lambda 架構總共由三層系統組成:批處理層(Batch Layer),速度處理層(Speed Layer),以及用於響應查詢的服務層(Serving Layer)。
在 Lambda 架構中,每層都有本身所肩負的任務。批處理層存儲管理主數據集(不可變的數據集)和預先批處理計算好的視圖。批處理層使用可處理大量數據的分佈式處理系統預先計算結果。它經過處理全部的已有歷史數據來實現數據的準確性。這意味着它是基於完整的數據集來從新計算的,可以修復任何錯誤,而後更新現有的數據視圖。輸出一般存儲在只讀數據庫中,更新則徹底取代現有的預先計算好的視圖。
速度處理層會實時處理新來的數據。速度層經過提供最新數據的實時視圖來最小化延遲。速度層所生成的數據視圖可能不如批處理層最終生成的視圖那樣準確或完整,但它們幾乎在收到數據後當即可用。而當一樣的數據在批處理層處理完成後,在速度層的數據就能夠被替代掉了。
本質上,速度層彌補了批處理層所致使的數據視圖滯後。好比說,批處理層的每一個任務都須要 1 個小時才能完成,而在這 1 個小時裏,咱們是沒法獲取批處理層中最新任務給出的數據視圖的。而速度層由於可以實時處理數據給出結果,就彌補了這 1 個小時的滯後。
全部在批處理層和速度層處理完的結果都輸出存儲在服務層中,服務層經過返回預先計算的數據視圖或從速度層處理構建好數據視圖來響應查詢。
全部的新用戶行爲數據均可以同時流入批處理層和速度層。批處理層會永久保存數據而且對數據進行預處理,獲得咱們想要的用戶行爲模型並寫入服務層。而速度層也同時對新用戶行爲數據進行處理,獲得實時的用戶行爲模型。
而當「應該對用戶投放什麼樣的廣告」做爲一個查詢(Query)來到時,咱們從服務層既查詢服務層中保存好的批處理輸出模型,也對速度層中處理的實時行爲進行查詢,這樣咱們就能夠獲得一個完整的用戶行爲歷史了。
一個查詢就以下圖所示,既經過批處理層兼顧了數據的完整性,也能夠經過速度層彌補批處理層的高延時性,讓整個查詢具備實時性。
雖然 Lambda 架構使用起來十分靈活,而且能夠適用於不少的應用場景,但在實際應用的時候,Lambda 架構也存在着一些不足,主要表如今它的維護很複雜。
使用 Lambda 架構時,架構師須要維護兩個複雜的分佈式系統,而且保證他們邏輯上產生相同的結果輸出到服務層中。舉個例子吧,咱們在部署 Lambda 架構的時候,能夠部署 Apache Hadoop 到批處理層上,同時部署 Apache Flink 到速度層上。
咱們都知道,在分佈式框架中進行編程實際上是十分複雜的,尤爲是咱們還會針對不一樣的框架進行專門的優化。因此幾乎每個架構師都認同,Lambda 架構在實戰中維護起來具備必定的複雜性。
那要怎麼解決這個問題呢?咱們先來思考一下,形成這個架構維護起來如此複雜的根本緣由是什麼呢?
維護 Lambda 架構的複雜性在於咱們要同時維護兩套系統架構:批處理層和速度層。咱們已經說過了,在架構中加入批處理層是由於從批處理層獲得的結果具備高準確性,而加入速度層是由於它在處理大規模數據時具備低延時性。
那咱們能不能改進其中某一層的架構,讓它具備另一層架構的特性呢?例如,改進批處理層的系統讓它具備更低的延時性,又或者是改進速度層的系統,讓它產生的數據視圖更具準確性和更加接近歷史數據呢?
另一種在大規模數據處理中經常使用的架構——Kappa 架構(Kappa Architecture),即是在這樣的思考下誕生的。
Kappa 架構是由 LinkedIn 的前首席工程師傑伊·克雷普斯(Jay Kreps)提出的一種架構思想。克雷普斯是幾個著名開源項目(包括 Apache Kafka 和 Apache Samza 這樣的流處理系統)的做者之一,也是如今 Confluent 大數據公司的 CEO。
克雷普斯提出了一個改進 Lambda 架構的觀點:
咱們能不能改進 Lambda 架構中速度層的系統性能,使得它也能夠處理好數據的完整性和準確性問題呢?咱們能不能改進 Lambda 架構中的速度層,使它既可以進行實時數據處理,同時也有能力在業務邏輯更新的狀況下從新處理之前處理過的歷史數據呢?
他根據自身多年的架構經驗發現,咱們是能夠作到這樣的改進的。咱們知道像 Apache Kafka 這樣的流處理平臺是具備永久保存數據日誌的功能的。經過Kafka的這一特性,咱們能夠從新處理部署於速度層架構中的歷史數據。
下面我就以 Kafka 爲例來介紹整個全新架構的過程。
第一步,部署 Kafka,並設置數據日誌的保留期(Retention Period)。
這裏的保留期指的是你但願可以從新處理的歷史數據的時間區間。例如,若是你但願從新處理最多一年的歷史數據,那就能夠把 Apache Kafka 中的保留期設置爲 365 天。若是你但願可以處理全部的歷史數據,那就能夠把 Apache Kafka 中的保留期設置爲「永久(Forever)」。
第二步,若是咱們須要改進現有的邏輯算法,那就表示咱們須要對歷史數據進行從新處理。咱們須要作的就是從新啓動一個 Kafka 做業實例(Instance)。這個做業實例將重頭開始,從新計算保留好的歷史數據,並將結果輸出到一個新的數據視圖中。
咱們知道 Kafka 的底層是使用 Log Offset 來判斷如今已經處理到哪一個數據塊了,因此只須要將 Log Offset 設置爲 0,新的做業實例就會重頭開始處理歷史數據。
第三步,當這個新的數據視圖處理過的數據進度遇上了舊的數據視圖時,咱們的應用即可以切換到重新的數據視圖中讀取。
第四步,中止舊版本的做業實例,並刪除舊的數據視圖。
這個架構就如同下圖所示。
與 Lambda 架構不一樣的是,Kappa 架構去掉了批處理層這一體系結構,而只保留了速度層。你只須要在業務邏輯改變又或者是代碼更改的時候進行數據的從新處理。Kappa 架構統一了數據的處理方式,再也不維護離線和實時兩套代碼邏輯。
Kappa 架構也是有着它自身的不足的。由於 Kappa 架構只保留了速度層而缺乏批處理層,在速度層上處理大規模數據可能會有數據更新出錯的狀況發生,這就須要咱們花費更多的時間在處理這些錯誤異常上面。若是需求發生變化或歷史數據須要從新處理都得經過上游重放來完成。而且從新處理歷史的吞吐能力會低於批處理。
還有一點,Kappa 架構的批處理和流處理都放在了速度層上,這致使了這種架構是使用同一套代碼來處理算法邏輯的。因此 Kappa 架構並不適用於批處理和流處理代碼邏輯不一致的場景。