一套 SQL 搞定數據倉庫?Flink有了新嘗試

數據倉庫是公司數據發展到必定規模後必然須要提供的一種基礎服務,也是「數據智能」建設的基礎環節。迅速獲取數據反饋不只有利於改善產品及用戶體驗,更有利於公司的科學決策,所以獲取數據的實時性尤其重要。mysql

目前企業的數倉建設大可能是離線一套,實時一套。業務要求低延時的使用實時數倉;業務複雜的使用離線數倉。架構十分複雜,須要使用不少系統和計算框架,這就要求企業儲備多方面的人才,致使人才成本較高,且出了問題難以排查,終端用戶也須要熟悉多種語法。本文分析目前的數倉架構,探索離線和實時數倉是否能放在一塊兒考慮,探索Flink的統一架構是否能解決大部分問題。sql

文末有福利,可下載電子書。數據庫

數倉架構

640.jpeg

數據倉庫能夠分爲三層:ODS(原始數據層)、DW(數據倉庫層)、ADS(應用數據層)。架構

1. ODS (Operation Data Store) 層

從日誌或者業務DB傳輸過來的原始數據,傳統的離線數倉作法也有直接用CDC (Change Data Capture) 工具週期同步到數倉裏面。用一套統一的Kafka來承接這個角色,可讓數據更實時的落入數倉,也能夠在這一層統一實時和離線的。併發

2. DW (Data warehouse) 層

DW層通常也分爲DWD層和DWS層:app

  • DWD (Data warehouse detail) 層:明細數據層,這一層的數據應該是通過清洗的,乾淨的、準確的數據,它包含的信息和ODS層相同,可是它遵循數倉和數據庫的標準Schema定義。
  • DWS (Data warehouse service) 層:彙總數據層,這一層可能通過了輕度的聚合,多是星型或雪花模型的結構數據,這一層已經作了一些業務層的計算,用戶能夠基於這一層,計算出數據服務所需數據。

3. ADS (Application Data Store) 層

和DWS不一樣的是,這一層直接面向用戶的數據服務,不須要再次計算,已是最終須要的數據。框架

主要分爲兩條鏈路:分佈式

  1. 業務DB和日誌 -> Kafka -> 實時數倉 (Kafka + Dim維表) -> BI DB -> 數據服務
  2. 業務DB和日誌 -> Kafka -> 離線數倉 (Hive metastore + HDFS) -> BI DB -> 數據服務

主流的數倉架構仍然是Lambda架構,Lambda架構雖然複雜,可是它能覆蓋業務上須要的場景,對業務來講,是最靈活的方式。函數

Lambda架構分爲兩條鏈路:工具

  • 傳統離線數據具備穩定、計算複雜、靈活的優勢,運行批計算,保證T+1的報表產生和靈活的Ad-hoc查詢。
  • 實時數倉提供低延時的數據服務,傳統的離線數倉每每都是T+1的延時,這致使分析人員無法作一些實時化的決策,而實時數倉整條鏈路的延遲最低甚至能夠作到秒級,這不但加快了分析和決策,並且也給更多的業務帶來了可能,好比實時化的監控報警。Flink的強項是實時計算、流計算,而Kafka是實時數倉存儲的核心。

上圖標出了1-9條邊,每條邊表明數據的轉換,就是大數據的計算,本文後續將分析這些邊,探索Flink在其中能夠發揮的做用。

Flink一棧式計算

元數據

先說下元數據的管理,離線數倉有Hive metastore來管理元數據,可是單純的Kafka不具有元數據管理的能力,這裏推薦兩種作法:

1. Confluent schema registry

搭建起schema registry服務後,經過confluent的url便可獲取到表的schema信息,對於上百個字段的表,它能夠省編寫Flink做業時的不少事,後續Flink也正在把它的schema推斷功能結合Confluent schema registry。可是它仍然省不掉建立表的過程,用戶也須要填寫Confluent對應的URL。

2. Catalog

目前Flink內置已提供了HiveCatalog,Kafka的表能夠直接集成到Hive metastore中,用戶在SQL中能夠直接使用這些表。可是Kafka的start-offset一些場景須要靈活的配置,爲此,Flink也正在提供 LIKE [1] 和 Table Hints [2] 等手段來解決。

Flink中離線數倉和實時數倉都使用Hive Catalog:

use catalog my_hive;
-- build streaming database and tables;
create database stream_db;
use stream_db;
create table order_table (
    id long,
    amount double,
    user_id long,
    status string,
    ts timestamp,
    … -- 可能還有幾十個字段
    ts_day string,
    ts_hour string
) with (
    ‘connector.type’ = ‘kafka’,
    … -- Kafka table相關配置
);
-- build batch database and tables;
create database batch_db;
use batch_db;
create table order_table like stream_db.order_table (excluding options)
partitioned by (ts_day, ts_hour)
with (
    ‘connector.type’ = ‘hive’,
    … -- Hive table相關配置
);

使用Catalog,後續的計算能夠徹底複用批和流,提供相同的體驗。

數倉導入

計算①和⑤分別是實時數倉的導入和離線數倉的導入,近來,更加實時的離線數倉導入愈來愈成爲數據倉庫的常規作法,Flink的導入可讓離線數倉的數據更實時化。

之前主要經過DataStream + StreamingFileSink的方式進行導入,可是不支持ORC和沒法更新HMS。

Flink streaming integrate Hive後,提供Hive的streaming sink [3],用SQL的方式會更方便靈活,使用SQL的內置函數和UDF,並且流和批能夠複用,運行兩個流計算做業。

insert into [stream_db.|batch_db.]order_table select … from log_table;

數據處理

計算②和⑥分別是實時數倉和離線數倉的中間數據處理,這裏面主要有三種計算:

  1. ETL:和數據導入同樣,批流沒有區別。
  2. 維表Join:維表補字段是很常見的數倉操做,離線數倉中基本都是直接Join Hive表便可,可是Streaming做業卻有些不一樣,下文將詳細描述。
  3. Aggregation:Streaming做業在這些有狀態的計算中,產生的不是一次肯定的值,而多是不斷變化的值。

維表Join

與離線計算不一樣,離線計算只用關心某個時間點的維表數據,而Streaming的做業持續運行,因此它關注的不能只是靜態數據,須要是動態的維表。

另外爲了Join的效率,streaming做業每每是join一個數據庫表,而不只僅是Hive表。

例子:

-- stream 維表
use stream_db;
create table user_info (
    user_id long,
    age int,
    address,
    primary key(user_id)
) with (
    ‘connector.type’ = ‘jdbc’,
    ...
);
 
-- 將離線數倉的維表導入實時數倉中
insert into user_info select * from batch_db.user_info;
 
-- 維表Join,SQL批流複用
insert into order_with_user_age select * from order_table join user_info for system_time as of order_table.proctime on user_info.user_id = user_info.user_id;

這裏有個很是麻煩的事情,那就是在實時數倉中,須要按時週期調度更新維表到實時維表數據庫中,那能不能直接Join離線數倉的Hive維表呢?目前社區也正在開發Hive維表,它有哪些挑戰:

  1. Hive維表太大,放不進Cache中:
  • 考慮Shuffle by key,分佈式的維表Join,減小單併發Cache的數據量
  • 考慮將維表數據放入State中
  1. 維表更新問題:
  • 簡單的方案是TTL過時
  • 複雜一些的方案是實現Hive streaming source,並結合Flink的watermark機制

有狀態計算和數據導出

例子:

select age, avg(amount) from order_with_user_age group by age;

一句簡單的聚合SQL,它在批計算和流計算的執行模式是徹底不一樣的。

Streaming的聚合和離線計算的聚合最大的不一樣在於它是一個動態表[4],它的輸出是在持續變化的。動態表的概念簡單來講,一個streaming的count,它的輸出是由輸入來驅動的,而不是像batch同樣,獲取所有輸入後纔會輸出,因此,它的結果是動態變化的:

  • 若是在SQL內部,Flink內部的retract機制會保證SQL 的結果的與批同樣。
  • 若是是外部的存儲,這給sink帶來了挑戰。

有狀態計算後的輸出:

  • 若是sink是一個可更新的數據庫,好比HBase/Redis/JDBC,那這看起來不是問題,咱們只須要不斷的去更新就行了。
  • 可是若是是不可更新的存儲呢,咱們沒有辦法去更新本來的數據。爲此,Flink提出了Changelog的支持[5],想內置支持這種sink,輸出特定Schema的數據,讓下游消費者也能很好的work起來。

例子:

-- batch:計算完成後,一次性輸出到mysql中,同key只有一個數據
-- streaming:mysql裏面的數據不斷更新,不斷變化
insert into mysql_table select age, avg(amount) from order_with_user_age group by age;
-- batch: 同key只有一個數據,append便可
insert into hive_table select age, avg(amount) from order_with_user_age group by age;
-- streaming: kafka裏面的數據不斷append,而且多出一列,來表示這是upsert的消息,後續的Flink消費會自動作出機制來處理upsert
insert into kafka_table select age, avg(amount) from order_with_user_age group by age;

AD-HOC與OLAP

離線數倉能夠進行計算⑨,對明細數據或者彙總數據均可以進行ad-hoc的查詢,可讓數據分析師進行靈活的查詢。

目前實時數倉一個比較大的缺點是不能Ad-hoc查詢,由於它自己沒有保存歷史數據,Kafka可能能夠保存3天以上的數據,可是一是存儲成本高、二是查詢效率也很差。

一個思路是提供OLAP數據庫的批流統一Sink組件:

  • Druid sink
  • Doris sink
  • Clickhouse sink
  • HBase/Phoenix sink

總結

本文從目前的Lambda架構出發,分析了Flink一棧式數倉計算方案的能力,本文中一些Flink新功能還在快速迭代演進中,隨着不斷的探索和實踐,但願朝着計算一體化的方向逐漸推動,未來的數倉架構但願能真正統一用戶的離線和實時,提供統一的體驗:

  • 統一元數據
  • 統一SQL開發
  • 統一數據導入與導出
  • 未來考慮統一存儲
相關文章
相關標籤/搜索