本套系列博客從真實商業環境抽取案例進行總結和分享,並給出大數據商業實戰指導,請持續關注本套博客。版權聲明:本套大數據商業實戰系列歸做者(秦凱新)全部,禁止轉載,歡迎學習。git
STAGE 層做爲數據緩衝層,主要負責採集不一樣類型的業務系統數據並保存必定期限內的相關業務數據,完成不一樣類型數據源的統一臨時存儲,同時避免 ETL 操做對業務系統性能形成影響,STAGE 層數據在數據結構、數據之間的邏輯關係上都與業務系統基本保持一致。github
ODS(Operational Data Store)層數據來源於 STAGE 層,它的數據通過了對 STAGE 層數據的清洗,包括編碼表去重、去空、垃圾數據過濾、數據類型規則化等。ODS存儲了運營系統(如OLTP(聯機事務處理)系統)近實時的詳細數據。數據庫
另外 ODS 做爲 DW 和 STAGE 層的橋樑,也能夠實現指標一致性的管理,將不一樣系統不一樣部門相同指標的定義及指標數據按照業務規則取其一,保證不一樣源數據的數據一致性,也能夠知足用戶對明細數據的查詢要求,直接從ODS層獲取明細數據進行分析。數組
DWD(Data Warehouse Detail)層數據是將 ODS 層數據根據數據清洗規則,通過質量檢查、數據清洗、轉換、標準化後,造成符合質量要求的公共數據中心,有的也稱爲 ODS層,是業務層與數據倉庫的隔離層數據結構
把 ODS 數據表結構改變成項目主題數據倉庫的表結構,對 DWD 層的全部表添加了代理鍵,標準化了業務系統編碼類型不統一的問題,創建了數據倉庫維度表和事實表的關聯體系,也爲緩慢變化維的實現奠基了基礎。併發
DWC(Data Warehouse Center)層主要管理固化報表的數據存儲,數據主要來源於 DWD 層,根據前臺所需數據創建物理模型,使用 ETL 抽取 DWD 層數據推送給 DWC 層,這樣顯著減小前臺應用直接關聯 DWD 層查詢明細數據的成本,提升平臺數據獲取的速度。高併發
DWB數據層(data warehouse base)表示 基礎數據層,存儲的是客觀數據,通常用做中間層,能夠認爲是大量指標的數據層。post
DWS數據層 (data warehouse service)表示 服務數據層,基於DWB上的基礎數據,整合彙總成分析某一個主題域的服務數據,通常是寬表。性能
DM(Data Mart)層即數據集市,將指標與維度創建物理模型組成數據集市,這是 OLAP 的數據基礎。該層實現了合併不一樣系統的數據源來知足面向主題的業務需求,它的建模是終端用戶驅動的,也是由業務需求驅動的。按主題,維度及 KPI 指標對 DM 層進行模型設計、建模,DM 層數據是將 DWD 層數據進行進一步整合、轉換、彙總、計算等 ETL 操做處理獲取的。學習
主要是執行基本平常的事務處理,好比數據庫記錄的增刪查改。好比在銀行的一筆交易記錄,就是一個典型的事務。
聯機分析處理OLAP(On-Line Analytical Processing) 是數據倉庫系統的主要應用,支持複雜的分析操做,側重決策支持,而且提供直觀易懂的查詢結果。典型的應用就是複雜的動態的報表系統。
OLAP是數據倉庫的核心部心,所謂數據倉庫是對於大量已經由OLTP造成的數據的一種分析型的數據庫,用於處理商業智能、決策支持等重要的決策信息;數據倉庫是在數據庫應用到必定程序以後而對歷史數據的加工與分析,讀取較多,更新較少。
DROP TABLE IF EXISTS tbl_stg;
CREATE TABLE tbl_stg (
id INT,
name STRING,
cty STRING,
st STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
複製代碼
DROP TABLE IF EXISTS tbl_dim;
CREATE TABLE tbl_dim (
sk INT,
id INT,
name STRING,
cty STRING,
st STRING,
version INT,
effective_date DATE,
expiry_date DATE)
CLUSTERED BY (id) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true');
複製代碼
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY tbl_stg.id) + t2.sk_max,
tbl_stg.*,
1,
CAST('1900-01-01' AS DATE),
CAST('2200-01-01' AS DATE)
from tbl_stg CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
複製代碼
drop table tbl_stg;
LOAD DATA LOCAL INPATH '/root/opendir/test-data/aa.txt' INTO TABLE tbl_stg;
複製代碼
SET hivevar:pre_date = DATE_ADD(CURRENT_DATE(),-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);
複製代碼
1,張三,US,CA -->scd2 scd1
2,李四,US,CB -->刪除
3,王五,CA,BB -->沒變
4,趙六,CA,BC -->scd1
5,老劉,AA,AA -->scd2
1,張,U,C
3,王五,CA,BB
4,趙六,AC,CB
5,劉,AA,AA
6,老楊,DD,DD
UPDATE tbl_dim
SET expiry_date = ${hivevar:pre_date}
WHERE sk IN
(SELECT a.sk FROM (
SELECT sk,id,name FROM tbl_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN tbl_stg b ON a.id=b.id
WHERE b.id IS NULL OR a.name<>b.name);
tbl_dim.sk tbl_dim.id tbl_dim.name tbl_dim.cty tbl_dim.st tbl_dim.version tbl_dim.effective_date tbl_dim.expiry_date
1 1 張三 US CA 1 1900-01-01 2018-09-03 -->過時
2 2 李四 US CB 1 1900-01-01 2018-09-03 -->過時(記錄刪除)
3 3 王五 CA BB 1 1900-01-01 2200-01-01
4 4 趙六 CA BC 1 1900-01-01 2200-01-01
5 5 老劉 AA AA 1 1900-01-01 2018-09-03 -->過時
複製代碼
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,
t1.id,
t1.name,
t1.cty,
t1.st,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.id id,
t2.name name,
t2.cty cty,
t2.st st,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
FROM tbl_dim t1 INNER JOIN tbl_stg t2
ON t1.id=t2.id AND t1.name<>t2.name AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN tbl_dim t3 ON t1.id = t3.id AND t3.expiry_date = ${hivevar:max_date}
WHERE t3.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
tbl_dim.sk tbl_dim.id tbl_dim.name tbl_dim.cty tbl_dim.st tbl_dim.version tbl_dim.effective_date tbl_dim.expiry_date
1 1 張三 US CA 1 1900-01-01 2018-09-03
6 1 張 U C 2 2018-09-03 2200-01-01 --->新增行
2 2 李四 US CB 1 1900-01-01 2018-09-03
3 3 王五 CA BB 1 1900-01-01 2200-01-01
4 4 趙六 CA BC 1 1900-01-01 2200-01-01
5 5 老劉 AA AA 1 1900-01-01 2018-09-03
7 5 劉 AA AA 2 2018-09-03 2200-01-01 --->新增行
複製代碼
用先delete再insert代替update,由於SCD1自己就不保存歷史數據,因此這裏更新維度表裏的全部cty或st改變的記錄,而不是僅僅更新當前版本的記錄
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT a.sk,a.id,a.name,b.cty,b.st,a.version,a.effective_date,a.expiry_date FROM tbl_dim a, tbl_stg b
WHERE a.id=b.id AND (a.cty <> b.cty OR a.st <> b.st);
DELETE FROM tbl_dim WHERE sk IN (SELECT sk FROM tmp);
INSERT INTO tbl_dim SELECT * FROM tmp;
原始數據
1,張三,US,CA -->scd2 scd1
2,李四,US,CB -->刪除
3,王五,CA,BB -->沒變
4,趙六,CA,BC -->scd1
5,老劉,AA,AA -->scd2
過分業務數據
1,張,U,C
3,王五,CA,BB
4,趙六,AC,CB
5,劉,AA,AA
6,老楊,DD,DD
複製代碼
修改了第4條數據的cty列和st列(按SCD1處理)
6 1 張 U C 2 2018-09-03 2200-01-01 -->scd2 scd1
1 1 張三 U C 1 1900-01-01 2018-09-03 -->scd2 scd1
2 2 李四 US CB 1 1900-01-01 2018-09-03 -->刪除
3 3 王五 CA BB 1 1900-01-01 2200-01-01 -->沒變
4 4 趙六 AC CB 1 1900-01-01 2200-01-01 -->scd1
5 5 老劉 AA AA 1 1900-01-01 2018-09-03 -->scd2
7 5 劉 AA AA 2 2018-09-03 2200-01-01 -->scd2
複製代碼
INSERT INTO tbl_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.id) + t2.sk_max,
t1.id,
t1.name,
t1.cty,
t1.st,
1,
${hivevar:pre_date},
${hivevar:max_date}
FROM
(
SELECT t1.* FROM tbl_stg t1 LEFT JOIN tbl_dim t2 ON t1.id = t2.id
WHERE t2.sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(sk),0) sk_max FROM tbl_dim) t2;
1,張三,US,CA -->scd2 scd1
2,李四,US,CB -->刪除
3,王五,CA,BB -->沒變
4,趙六,CA,BC -->scd1
5,老劉,AA,AA -->scd2
1,張,U,C
3,王五,CA,BB
4,趙六,AC,CB
5,劉,AA,AA
6,老楊,DD,DD
6 1 張 U C 2 2018-09-03 2200-01-01 -->scd2 scd1
1 1 張三 U C 1 1900-01-01 2018-09-03 -->scd2 scd1
2 2 李四 US CB 1 1900-01-01 2018-09-03 -->刪除
3 3 王五 CA BB 1 1900-01-01 2200-01-01 -->沒變
4 4 趙六 AC CB 1 1900-01-01 2200-01-01 -->scd1
5 5 老劉 AA AA 1 1900-01-01 2018-09-03 -->scd2
7 5 劉 AA AA 2 2018-09-03 2200-01-01 -->scd2
8 6 老楊 DD DD 1 2018-09-03 2200-01-01 -->新增
複製代碼
drop table if exists articles;
create external table articles(
article_id STRING,
url STRING,
kws array< STRING >
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
LOCATION '/hivepeixun/articles';
複製代碼
drop table if exists user_actions;
複合數組元素,拆分split
create external table user_actions(
user_id STRING,
article_id STRING,
ts STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/hivepeixun/users';
collect_set去重功能:
select user_id, collect_set(article_id) from user_actions group by user_id;
collect_list沒有去重功能:用戶分組後,進行文章彙總成集合:
select user_id, collect_list(article_id) from user_actions group by user_id;
:
select user_id,sort_array(collect_list(article_id)) from user_actions group by user_id;
拆開某條字段,一條數據變成多條記錄,沒有關聯時爲空的就過濾掉了:
select article_id,kw from articles2 lateral view explode(kws) t as kw;
select a.* , b.kw from user_actions as a left outer join (select article_id, kw from articles lateral view explode(kws) t as kw) b on (a.article_id =b.article_id)
複製代碼
select a.user_id , b.kw , count(1) as weight from user_actions as a
left outer join (select article_id, kw from articles lateral view explode(kws) t as kw)
b on (a.article_id =b.article_id) group by a.user_id ,b.kw order by a.user_id ,weight desc;
複製代碼
未完待續
本節內容主要探討了數據倉庫模型與緩慢變化維度技術深度剖析,本文參考了經典案例,並進行從新編排,內容有所變化,可能部分截圖來自github公開源碼,部分是個人測試案例,若有雷同某位大神私有內容,請直接留言於我,我來從新修正案例。
秦凱新 於深圳