Apache Flink 漫談系列 - 持續查詢(Continuous Queries)

實際問題

咱們知道在流計算場景中,數據是源源不斷的流入的,數據流永遠不會結束,那麼計算就永遠不會結束,若是計算永遠不會結束的話,那麼計算結果什麼時候輸出呢?本篇將介紹Apache Flink利用持續查詢來對流計算結果進行持續輸出的實現原理。sql

數據管理

在介紹持續查詢以前,咱們先看看Apache Flink對數據的管理和傳統數據庫對數據管理的區別,以MySQL爲例,以下圖:
數據庫

如上圖所示傳統數據庫是數據存儲和查詢計算於一體的架構管理方式,這個很明顯,oracle數據庫不可能管理MySQL數據庫數據,反之亦然,每種數據庫廠商都有本身的數據庫管理和存儲的方式,各自有特有的實現。在這點上Apache Flink海納百川(也有corner case),將data store 進行抽象,分爲source(讀) 和 sink(寫)兩種類型接口,而後結合不一樣存儲的特色提供經常使用數據存儲的內置實現,固然也支持用戶自定義的實現。架構

那麼在宏觀設計上Apache Flink與傳統數據庫同樣均可以對數據表進行SQL查詢,並將產出的結果寫入到數據存儲裏面,那麼Apache Flink上面的SQL查詢和傳統數據庫查詢的區別是什麼呢?Apache Flink又是如何作到求同(語義相同)存異(實現機制不一樣),完美支持ANSI-SQL的呢?oracle

靜態查詢

傳統數據庫中對錶(好比 flink_tab,有user和clicks兩列,user主鍵)的一個查詢SQL(select * from flink_tab)在數據量容許的狀況下,會馬上返回表中的全部數據,在查詢結果顯示以後,對數據庫表flink_tab的DML操做將與執行的SQL無關了。也就是說傳統數據庫下面對錶的查詢是靜態查詢,將計算的最終查詢的結果當即輸出,以下:app

select * from flink_tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      1 |
+----+------+--------+
1 row in set (0.00 sec)

當我執行完上面的查詢,查詢結果當即返回,上面狀況告訴咱們表 flink_tab裏面只有一條記錄,id=1,user=Mary,clicks=1; 這樣傳統數據庫表的一條查詢語句就徹底結束了。傳統數據庫表在查詢那一刻咱們這裏叫Static table,是指在查詢的那一刻數據庫表的內容再也不變化了,查詢進行一次計算完成以後表的變化也與本次查詢無關了,咱們將在Static Table 上面的查詢叫作靜態查詢。優化

持續查詢

什麼是連續查詢呢?連續查詢發生在流計算上面,在 《Apache Flink 漫談系列 - 流表對偶(duality)性》 中咱們提到過Dynamic Table,連續查詢是做用在Dynamic table上面的,永遠不會結束的,隨着表內容的變化計算在不斷的進行着...ui

靜態/持續查詢特色

靜態查詢和持續查詢的特色就是《Apache Flink 漫談系列 - 流表對偶(duality)性》中所提到的批與流的計算特色,批一次查詢返回一個計算結果就結束查詢,流一次查詢不斷修正計算結果,查詢永遠不結束,表格示意以下:阿里雲

查詢類型 計算次數 計算結果
靜態查詢 1 最終結果
持續查詢 無限 不斷更新

靜態/持續查詢關係

接下來咱們以flink_tab表實際操做爲例,體驗一下靜態查詢與持續查詢的關係。假如咱們對flink_tab表再進行一條增長和一次更新操做,以下:spa

MySQL> insert into flink_tab(user, clicks) values ('Bob', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update flink_tab set clicks=2 where user='Mary';
Query OK, 1 row affected (0.06 sec)

這時候咱們再進行查詢 select * from flink_tab ,結果以下:插件

MySQL> select * from flink_tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      2 |
|  2 | Bob  |      1 |
+----+------+--------+
2 rows in set (0.00 sec)

那麼咱們看見,相同的查詢SQL(select * from flink_tab),計算結果徹底 不 同樣了。這說明相同的sql語句,在不一樣的時刻執行計算,獲得的結果可能不同(有點像廢話),就以下圖同樣:

假設不斷的有人在對錶flink_tab作操做,同時有一我的間歇性的發起對錶數據的查詢,上圖咱們只是在三個時間點進行了3次查詢。而且在這段時間內數據表的內容也在變化。引發上面變化的DML以下:

MySQL> insert into flink_tab(user, clicks) values ('Llz', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update flink_tab set clicks=2 where user='Bob';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> update flink_tab set clicks=3 where user='Mary';
Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

到如今咱們不難想象,上面圖內容的核心要點以下:

  • 時間
  • 表數據變化
  • 觸發計算
  • 計算結果更新

接下來咱們利用傳統數據庫現有的機制模擬一下持續查詢...

無PK的 Append only 場景

接下來咱們把上面隱式存在的時間屬性timestamp做爲表flink_tab_ts(timestamp,user,clicks三列,無主鍵)的一列,再寫一個 觸發器(Trigger) 示例觀察一下:

timestamp user clicks
1525099013 Mary 1
1525099026 Bob 1
1525099035 Mary 2
1525099047 Llz 1
1525099056 Bob 2
1525099065 Mary 3
// INSERT 的時候查詢一下數據flink_tab_ts,將結果寫到trigger.sql中
 DELIMITER ;;
create trigger flink_tab_ts_trigger_insert after insert
on flink_tab_ts for each row
  begin
       select ts, user, clicks from flink_tab_ts into OUTFILE '/Users/jincheng.sunjc/testdir/atas/trigger.sql';
  end ;;
DELIMITER ;

上面的trigger要將查詢結果寫入本地文件,默認MySQL是不容許寫入的,咱們查看一下:

MySQL> show variables like '%secure%';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| require_secure_transport | OFF   |
| secure_file_priv         | NULL  |
+--------------------------+-------+
2 rows in set (0.00 sec)

上面secure_file_priv屬性爲NULL,說明MySQL不容許寫入file,我須要修改my.cnf在添加secure_file_priv=''打開寫文件限制;

MySQL> show variables like '%secure%';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| require_secure_transport | OFF   |
| secure_file_priv         |       |
+--------------------------+-------+
2 rows in set (0.00 sec)

下面咱們對flink_tab_ts進行INSERT操做:

咱們再來看看6次trigger 查詢計算的結果:

你們到這裏發現我寫了Trigger的存儲過程以後,每次在數據表flink_tab_ts進行DML操做的時候,Trigger就會觸發一次查詢計算,產出一份新的計算結果,觀察上面的查詢結果發現,結果表不停的增長(Append only)。

有PK的Update場景

咱們利用flink_tab_ts的6次DML操做和自定義的觸發器TriggerL來介紹了什麼是持續查詢,作處理靜態查詢與持續查詢的關係。那麼上面的演示目的是爲了說明持續查詢,全部操做都是insert,沒有基於主鍵的更新,也就是說Trigger產生的結果都是append only的,那麼你們想想,若是咱們操做flink_tab這張表,按主鍵user進行插入和更新操做,一樣利用Trigger機制來進行持續查詢,結果是怎樣的的呢? 初始化表,trigger:

drop table flink_tab;
create table flink_tab(
    user VARCHAR(100) NOT NULL,
    clicks INT NOT NULL,
    PRIMARY KEY (user)
 );

 DELIMITER ;;
create trigger flink_tab_trigger_insert after insert
on flink_tab for each row
  begin
       select user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
  end ;;
DELIMITER ;

DELIMITER ;;
create trigger flink_tab_trigger_ after update
on flink_tab for each row
  begin
        select ts, user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
  end ;;
DELIMITER ;

 一樣我作以下6次DML操做,Trigger 6次查詢計算:

在來看看此次的結果與append only 有什麼不一樣?

我想你們早就知道這結果了,數據庫裏面定義的PK全部變化會按PK更新,那麼觸發的6次計算中也會獲得更新後的結果,這應該不難理解,查詢結果也是不斷更新的(Update)!

關係定義 

上面Append Only 和 Update兩種場景在MySQL上面均可以利用Trigger機制模擬 持續查詢的概念,也就是說數據表中每次數據變化,咱們都觸發一次相同的查詢計算(只是計算時候數據的集合發生了變化),由於數據表不斷的變化,這個表就能夠看作是一個動態表Dynamic Table,而查詢SQL(select * from flink_tab_ts) 被觸發器Trigger在知足某種條件後不停的觸發計算,進而也不斷地產生新的結果。這種做用在Dynamic Table,而且有某種機制(Trigger)不斷的觸發計算的查詢咱們就稱之爲 持續查詢。

那麼到底靜態查詢和動態查詢的關係是什麼呢?在語義上 持續查詢 中的每一次查詢計算的觸發都是一次靜態查詢(相對於當時查詢的時間點),  在實現上 Apache Flink會利用上一次查詢結果+當前記錄 以增量的方式完成查詢計算。

特別說明: 上面咱們利用 數據變化+Trigger方式描述了持續查詢的概念,這裏有必要特別強調一下的是數據庫中trigger機制觸發的查詢,每次都是一個全量查詢,這與Apache Flink上面流計算的持續查詢概念相同,但實現機制徹底不一樣,Apache Flink上面的持續查詢內部實現是增量處理的,隨着時間的推移,每條數據的到來實時處理當前的那一條記錄,不會處理曾經來過的歷史記錄!

Apache Flink 如何作到持續查詢

動態表上面持續查詢

在 《Apache Flink 漫談系列 - 流表對偶(duality)性》 中咱們瞭解到流和表能夠相互轉換,在Apache Flink流計算中攜帶流事件的Schema,通過算子計算以後再產生具備新的Schema的事件,流入下游節點,在產生新的Schema的Event和不斷流轉的過程就是持續查詢做用的結果,以下圖:

增量計算

咱們進行查詢大多數場景是進行數據聚合,好比查詢SQL中利用count,sum等aggregate function進行聚合統計,那麼流上的數據源源不斷的流入,咱們既不能等全部事件流入結束(永遠不會結束)再計算,也不會每次來一條事件就像傳統數據庫同樣將所有事件集合從新總體計算一次,在持續查詢的計算過程當中,Apache Flink採用增量計算的方式,也就是每次計算都會將計算結果存儲到state中,下一條事件到來的時候利用上次計算的結果和當前的事件進行聚合計算,好比 有一個訂單表,以下:

一個簡單的計數和求和查詢SQL:

// 求訂單總數和全部訂單的總金額
select count(id) as cnt,sum(amount)as sumAmount from order_tab;

這樣一個簡單的持續查詢計算,Apache Flink內部是如何處理的呢?以下圖:

如上圖,Apache Flink中每來一條事件,就進行一次計算,而且每次計算後結果會存儲到state中,供下一條事件到來時候進行計算,即:

result(n) = calculation(result(n-1), n)。

無PK的Append Only 場景 

在實際的業務場景中,咱們只須要進行簡單的數據統計,而後就將統計結果寫入到業務的數據存儲系統裏面,好比上面統計訂單數量和總金額的場景,訂單表自己是一個append only的數據源(假設沒有更新,截止到2018.5.14日,Apache Flink內部支持的數據源都是append only的),在持續查詢過程當中通過count(id),sum(amount)統計計算以後產生的動態表也是append only的,種場景Apache Flink內部只須要進行aggregate function的聚合統計計算就能夠,以下:

有PK的Update 場景

如今咱們將上面的訂單場景稍微變化一下,在數據表上面咱們將金額字段amount,變爲地區字段region,數據以下:

查詢統計的變爲,在計算具備相同訂單數量的地區數量;查詢SQL以下:

CREATE TABLE order_tab(
   id BIGINT,
   region VARCHAR
 ) 

CREATE TABLE region_count_sink(
   order_cnt BIGINT, 
   region_cnt BIGINT,
   PRIMARY KEY(order_cnt) -- 主鍵
) 

-- 按地區分組計算每一個地區的訂單數量
CREATE VIEW order_count_view AS
    SELECT
        region, count(id) AS order_cnt
    FROM  order_tab 
    GROUP BY region;

-- 按訂單數量分組統計具備相同訂單數量的地區數量
INSERT INTO region_count_sink 
    SELECT 
        order_cnt,
        count(region) as region_cnt
    FROM order_count_view 
    GROUP BY order_cnt;

上面查詢SQL的代碼結構以下(這個圖示在Alibaba StreamCompute的集成IDE環境生成的,瞭解更多):

上面SQL中咱們發現有兩層查詢計算邏輯,第一個查詢計算邏輯是與SOURCE相連的按地區統計訂單數量的分組統計,第二個查詢計算邏輯是在第一個查詢產出的動態表上面進行按訂單數量統計地區數量的分組統計,咱們一層一層分析。

錯誤處理
  • 第一層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region; 

  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

按照第一層分析的結果,再分析第二層產出的結果,咱們分析的過程是對的,可是最終寫到sink表的計算結果是錯誤的,那咱們錯在哪裏了呢?

其實當 (SH,2)這條記錄來的時候,之前來過的(SH, 1)已是髒數據了,當(BJ, 2)來的時候,已經參與過計算的(BJ, 1)也變成髒數據了,一樣當(BJ, 3)來的時候,(BJ, 2)也是髒數據了,上面的分析,沒有處理髒數據進而致使最終結果的錯誤。那麼Apache Flink內部是如何正確處理的呢?

正確處理
  • 第一層分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • 第二層分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

上面咱們將有更新的事件進行打標的方式來處理髒數據,這樣在Apache Flink內部計算的時候 算子會根據事件的打標來處理事件,在aggregate function中有兩個對應的方法(retract和accumulate)來處理不一樣標識的事件,如上面用到的count AGG,內部實現以下:

def accumulate(acc: CountAccumulator): Unit = {
    acc.f0 += 1L // acc.f0 存儲記數
}

def retract(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 -= 1L //acc.f0 存儲記數
    }
}

Apache Flink內部這種爲事件進行打標的機制叫作 retraction。retraction機制保障了在流上已經流轉到下游的髒數據須要被撤回問題,進而保障了持續查詢的正確語義。

Apache Flink Connector 類型

本篇一開始就對比了MySQL的數據存儲和Apache Flink數據存儲的區別,Apache Flink目前是一個計算平臺,將數據的存儲以高度抽象的插件機制與各類已有的數據存儲無縫對接。目前Apache Flink中將數據插件稱之爲連接器Connector,Connnector又按數據的讀和寫分紅Soruce(讀)和Sink(寫)兩種類型。對於傳統數據庫表,PK是一個很重要的屬性,在頻繁的按某些字段(PK)進行更新的場景,在表上定義PK很是重要。那麼做爲徹底支持ANSI-SQL的Apache Flink平臺在Connector上面是否也支持PK的定義呢?

Apache Flink Source

如今(2018.11.5)Apache Flink中用於數據流驅動的Source Connector上面沒法定義PK,這樣在某些業務場景下會形成數據量較大,形成計算資源沒必要要的浪費,甚至有聚合結果不是用戶「指望」的狀況。咱們以雙流JOIN爲例來講明:

SQL:

CREATE TABLE inventory_tab(
   product_id VARCHAR,
   product_count BIGINT
); 

CREATE TABLE sales_tab(
   product_id VARCHAR,
   sales_count BIGINT
  ) ;

CREATE TABLE join_sink(
   product_id VARCHAR, 
   product_count BIGINT,
   sales_count BIGINT,
   PRIMARY KEY(product_id)
);

CREATE VIEW join_view AS
    SELECT
        l.product_id, 
        l.product_count,
        r.sales_count
    FROM inventory_tab l 
        JOIN  sales_tab r 
        ON l.product_id = r.product_id;

INSERT INTO join_sink 
  SELECT 
      product_id, 
      product_count,
      sales_count
  FROM join_view ;

代碼結構圖:

實現示意圖:

上圖描述了一個雙流JOIN的場景,雙流JOIN的底層實現會將左(L)右(R)兩面的數據都持久化到Apache Flink的State中,當L流入一條事件,首先會持久化到LState,而後在和RState中存儲的R中全部事件進行條件匹配,這樣的邏輯若是R流product_id爲P001的產品銷售記錄已經流入4條,L流的(P001, 48) 流入的時候會匹配4條事件流入下游(join_sink)。

問題

上面雙流JOIN的場景,咱們發現其實inventory和sales表是有業務的PK的,也就是兩張表上面的product_id是惟一的,可是因爲咱們在Sorure上面沒法定義PK字段,表上面全部的數據都會以append only的方式從source流入到下游計算節點JOIN,這樣就致使了JOIN內部全部product_id相同的記錄都會被匹配流入下游,上面的例子是 (P001, 48) 來到的時候,就向下遊流入了4條記錄,不難想象每一個product_id相同的記錄都會與歷史上全部事件進行匹配,進而操做下游數據壓力。

那麼這樣的壓力是必要的嗎?從業務的角度看,不是必要的,由於對於product_id相同的記錄,咱們只須要對左右兩邊最新的記錄進行JOIN匹配就能夠了。好比(P001, 48)到來了,業務上面只須要右流的(P001, 22)匹配就好,流入下游一條事件(P001, 48, 22)。 那麼目前在Apache Flink上面如何作到這樣的優化呢?

解決方案

上面的問題根本上咱們要構建一張有PK的動態表,這樣按照業務PK進行更新處理,咱們能夠在Source後面添加group by 操做生產一張有PK的動態表。以下:(以下DDL和LAST_VALUE 是Alibaba內部對Flink的增強,目前尚未推回社區,你們慢慢等待,你們想體驗能夠登陸阿里雲流計算平臺,詳見

SQL:

CREATE TABLE inventory_tab(
   product_id VARCHAR,
   product_count BIGINT
  ) 

 CREATE TABLE sales_tab(
   product_id VARCHAR,
   sales_count BIGINT
  )
CREATE VIEW inventory_view AS
    SELECT 
    product_id,
    LAST_VALUE(product_count) AS product_count
    FROM inventory_tab
    GROUP BY product_id;

CREATE VIEW sales_view AS
    SELECT 
    product_id,
    LAST_VALUE(sales_count) AS sales_count
    FROM sales_tab
    GROUP BY product_id;

CREATE TABLE join_sink(
   product_id VARCHAR, 
   product_count BIGINT,
   sales_count BIGINT,
   PRIMARY KEY(product_id)
)WITH (
    type = 'print'
) ;

CREATE VIEW join_view AS
    SELECT
        l.product_id, 
        l.product_count,
        r.sales_count
    FROM inventory_view l 
        JOIN  sales_view r 
        ON l.product_id = r.product_id;

 INSERT INTO join_sink 
  SELECT 
      product_id, 
      product_count,
      sales_count
  FROM join_view ;

代碼結構:

實現示意圖:

如上方式能夠將無PK的source通過一次節點變成有PK的動態表,以Apache Flink的retract機制和業務要素解決數據瓶頸,減小計算資源的消耗。

說明1: 上面方案LAST_VALUE是Alibaba內部對Flink的加強功能,社區尚未支持。

Apache Flink Sink

在Apache Flink上面能夠根據實際外部存儲的特色(是否支持PK),以及總體job的執行plan來動態推導Sink的執行模式,具體有以下三種類型:

  • Append 模式 - 該模式用戶在定義Sink的DDL時候不定義PK,在Apache Flink內部生成的全部只有INSERT語句;
  • Upsert 模式 - 該模式用戶在定義Sink的DDL時候能夠定義PK,在Apache Flink內部會根據事件打標(retract機制)生成INSERT/UPDATE和DELETE 語句,其中若是定義了PK, UPDATE語句按PK進行更新,若是沒有定義PK UPDATE會按整行更新;
  • Retract 模式 - 該模式下會產生INSERT和DELETE兩種信息,Sink Connector 根據這兩種信息構造對應的數據操做指令;

小結

本篇以MySQL爲例介紹了傳統數據庫的靜態查詢和利用MySQL的Trigger+DML操做來模擬持續查詢,並介紹了Apache Flink上面利用增量模式完成持續查詢,並以雙流JOIN爲例說明了持續查詢可能會遇到的問題,而且介紹Apache Flink覺得事件打標產生delete事件的方式解決持續查詢的問題,進而保證語義的正確性,完美的在流計算上支持續查詢。



本文做者:金竹

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索