數據倉庫是公司數據發展到必定規模後必然須要提供的一種基礎服務,也是「數據智能」建設的基礎環節。迅速獲取數據反饋不只有利於改善產品及用戶體驗,更有利於公司的科學決策,所以獲取數據的實時性尤其重要。mysql
目前企業的數倉建設大可能是離線一套,實時一套。業務要求低延時的使用實時數倉;業務複雜的使用離線數倉。架構十分複雜,須要使用不少系統和計算框架,這就要求企業儲備多方面的人才,致使人才成本較高,且出了問題難以排查,終端用戶也須要熟悉多種語法。本文分析目前的數倉架構,探索離線和實時數倉是否能放在一塊兒考慮,探索Flink的統一架構是否能解決大部分問題。sql
文末有福利,可下載電子書。數據庫
數據倉庫能夠分爲三層:ODS(原始數據層)、DW(數據倉庫層)、ADS(應用數據層)。架構
從日誌或者業務DB傳輸過來的原始數據,傳統的離線數倉作法也有直接用CDC (Change Data Capture) 工具週期同步到數倉裏面。用一套統一的Kafka來承接這個角色,可讓數據更實時的落入數倉,也能夠在這一層統一實時和離線的。併發
DW層通常也分爲DWD層和DWS層:app
和DWS不一樣的是,這一層直接面向用戶的數據服務,不須要再次計算,已是最終須要的數據。框架
主要分爲兩條鏈路:分佈式
主流的數倉架構仍然是Lambda架構,Lambda架構雖然複雜,可是它能覆蓋業務上須要的場景,對業務來講,是最靈活的方式。函數
Lambda架構分爲兩條鏈路:工具
上圖標出了1-9條邊,每條邊表明數據的轉換,就是大數據的計算,本文後續將分析這些邊,探索Flink在其中能夠發揮的做用。
先說下元數據的管理,離線數倉有Hive metastore來管理元數據,可是單純的Kafka不具有元數據管理的能力,這裏推薦兩種作法:
搭建起schema registry服務後,經過confluent的url便可獲取到表的schema信息,對於上百個字段的表,它能夠省編寫Flink做業時的不少事,後續Flink也正在把它的schema推斷功能結合Confluent schema registry。可是它仍然省不掉建立表的過程,用戶也須要填寫Confluent對應的URL。
目前Flink內置已提供了HiveCatalog,Kafka的表能夠直接集成到Hive metastore中,用戶在SQL中能夠直接使用這些表。可是Kafka的start-offset一些場景須要靈活的配置,爲此,Flink也正在提供 LIKE [1] 和 Table Hints [2] 等手段來解決。
Flink中離線數倉和實時數倉都使用Hive Catalog:
use catalog my_hive; -- build streaming database and tables; create database stream_db; use stream_db; create table order_table ( id long, amount double, user_id long, status string, ts timestamp, … -- 可能還有幾十個字段 ts_day string, ts_hour string ) with ( ‘connector.type’ = ‘kafka’, … -- Kafka table相關配置 ); -- build batch database and tables; create database batch_db; use batch_db; create table order_table like stream_db.order_table (excluding options) partitioned by (ts_day, ts_hour) with ( ‘connector.type’ = ‘hive’, … -- Hive table相關配置 );
使用Catalog,後續的計算能夠徹底複用批和流,提供相同的體驗。
計算①和⑤分別是實時數倉的導入和離線數倉的導入,近來,更加實時的離線數倉導入愈來愈成爲數據倉庫的常規作法,Flink的導入可讓離線數倉的數據更實時化。
之前主要經過DataStream + StreamingFileSink的方式進行導入,可是不支持ORC和沒法更新HMS。
Flink streaming integrate Hive後,提供Hive的streaming sink [3],用SQL的方式會更方便靈活,使用SQL的內置函數和UDF,並且流和批能夠複用,運行兩個流計算做業。
insert into [stream_db.|batch_db.]order_table select … from log_table;
計算②和⑥分別是實時數倉和離線數倉的中間數據處理,這裏面主要有三種計算:
與離線計算不一樣,離線計算只用關心某個時間點的維表數據,而Streaming的做業持續運行,因此它關注的不能只是靜態數據,須要是動態的維表。
另外爲了Join的效率,streaming做業每每是join一個數據庫表,而不只僅是Hive表。
例子:
-- stream 維表 use stream_db; create table user_info ( user_id long, age int, address, primary key(user_id) ) with ( ‘connector.type’ = ‘jdbc’, ... ); -- 將離線數倉的維表導入實時數倉中 insert into user_info select * from batch_db.user_info; -- 維表Join,SQL批流複用 insert into order_with_user_age select * from order_table join user_info for system_time as of order_table.proctime on user_info.user_id = user_info.user_id;
這裏有個很是麻煩的事情,那就是在實時數倉中,須要按時週期調度更新維表到實時維表數據庫中,那能不能直接Join離線數倉的Hive維表呢?目前社區也正在開發Hive維表,它有哪些挑戰:
例子:
select age, avg(amount) from order_with_user_age group by age;
一句簡單的聚合SQL,它在批計算和流計算的執行模式是徹底不一樣的。
Streaming的聚合和離線計算的聚合最大的不一樣在於它是一個動態表[4],它的輸出是在持續變化的。動態表的概念簡單來講,一個streaming的count,它的輸出是由輸入來驅動的,而不是像batch同樣,獲取所有輸入後纔會輸出,因此,它的結果是動態變化的:
有狀態計算後的輸出:
例子:
-- batch:計算完成後,一次性輸出到mysql中,同key只有一個數據 -- streaming:mysql裏面的數據不斷更新,不斷變化 insert into mysql_table select age, avg(amount) from order_with_user_age group by age; -- batch: 同key只有一個數據,append便可 insert into hive_table select age, avg(amount) from order_with_user_age group by age; -- streaming: kafka裏面的數據不斷append,而且多出一列,來表示這是upsert的消息,後續的Flink消費會自動作出機制來處理upsert insert into kafka_table select age, avg(amount) from order_with_user_age group by age;
離線數倉能夠進行計算⑨,對明細數據或者彙總數據均可以進行ad-hoc的查詢,可讓數據分析師進行靈活的查詢。
目前實時數倉一個比較大的缺點是不能Ad-hoc查詢,由於它自己沒有保存歷史數據,Kafka可能能夠保存3天以上的數據,可是一是存儲成本高、二是查詢效率也很差。
一個思路是提供OLAP數據庫的批流統一Sink組件:
本文從目前的Lambda架構出發,分析了Flink一棧式數倉計算方案的能力,本文中一些Flink新功能還在快速迭代演進中,隨着不斷的探索和實踐,但願朝着計算一體化的方向逐漸推動,未來的數倉架構但願能真正統一用戶的離線和實時,提供統一的體驗: