Apache Flink 漫談系列 - 流表對偶(duality)性

摘要: 實際問題 不少大數據計算產品,都對用戶提供了SQL API,好比Hive, Spark, Flink等,那麼SQL做爲傳統關係數據庫的查詢語言,是應用在批查詢場景的。Hive和Spark本質上都是Batch的計算模式(在《Apache Flink 漫談系列 - 概述》咱們介紹過Spark是Micr...html

實際問題

不少大數據計算產品,都對用戶提供了SQL API,好比Hive, Spark, Flink等,那麼SQL做爲傳統關係數據庫的查詢語言,是應用在批查詢場景的。Hive和Spark本質上都是Batch的計算模式(在《Apache Flink 漫談系列 - 概述》咱們介紹過Spark是Micro Batching模式),提供SQL API很容易被人理解,可是Flink是純流(Native Streaming)的計算模式, 流與批在數據集和計算過程上有很大的區別,以下:mysql

  • 批查詢場景的特色 - 有限數據集,一次查詢返回一個計算結果就結束查詢
  • 流查詢場景的特色 - 無限數據集,一次查詢不斷修正計算結果,查詢永遠不結束

咱們發現批與流的查詢場景在數據集合和計算過程上都有很大的不一樣,那麼基於Native Streaming模式的Apache Flink爲啥也能爲用戶提供SQL API呢?sql

流與批的語義關係

咱們知道SQL都是做用於關係表的,在傳統數據庫中進行查詢時候,SQL所要查詢的表在觸發查詢時候數據是不會變化的,也就是說在查詢那一刻,表是一張靜態表,至關因而一個有限的批數據,這樣也說明SQL是源於對批計算的查詢的,那麼要回答Apache Flink爲啥也能爲用戶提供SQL API,咱們首先要理解流與批在語義層面的關係。咱們以一個具體示例說明,以下圖:數據庫

上圖展示的是一個攜帶時間戳和用戶名的點擊事件流,咱們先對這些事件流進行流式統計,同時在最後的流事件上觸發批計算。流計算中每接收一個數據都會觸發一次計算,咱們以2018/4/30 22:37:45 Mary到來那一時間切片看,不管是在流仍是批上計算結果都是6。也就是說在相同的數據源,相同的查詢邏輯下,流和批的計算結果是相同的。相同的SQL在流和批這兩種模式下,最終結果是一致的,那麼流與批在語義上是徹底相同的。session

流與表的關係

流與批在語義上是一致的,SQL是做用於表的,那麼要回答Apache Flink爲啥也能爲用戶提供SQL API的問題,就變成了流與表是否具備等價性,也就是本篇要重點介紹的爲何流表具備對偶(duality)性?以下圖所示,一張表能夠看作爲流嗎?一樣流能夠看作是一張表嗎?若是能夠須要怎樣的條件和變通?app

MySQL主備複製

在介紹流與表的關係以前咱們先聊聊MySQL的主備複製,binlog是MySQL實現主備複製的核心手段,簡單來講MySQL主備複製實現分紅三個步驟:函數

  • Master將改變(change logs)以二進制日誌事件(binary log events)形式記錄到二進制日誌(binlog)中;
  • Slave將Master的binary log events拷貝到它的中繼日誌(relay log);
  • Slave重作中繼日誌中的事件,將改變反映到數據;

具體以下圖所示:工具

binlog

接下來咱們從binlog模式,binlog格式以及經過查看binlog的具體內容來詳盡介紹binlog與表的關係。大數據

binlog模式

上面介紹的MySQL主備複製的核心手段是利用binlog實現的,那邊binlog會記錄那些內容呢?binlog記錄了數據庫全部的增、刪、更新等操做。MySQL支持三種方式記錄binlog:this

  • statement-based logging - Events contain SQL statements that produce data changes (inserts, updates, deletes);
  • row-based logging - Events describe changes to individual rows;
  • mixed-base logging - 該模式默認是statement-based,當遇到以下狀況會自動切換到row-based:

    • NDB存儲引擎,DML操做以row格式記錄;
    • 使用UUID()、USER()、CURRENT_USER()、FOUND_ROWS()等不肯定函數;
    • 使用Insert Delay語句;
    • 使用用戶自定義函數(UDF);
    • 使用臨時表;

binlog格式

咱們以row-based 模式爲例介紹一下binlog的存儲格式 ,全部的 binary log events都是字節序列,由兩部分組成:

  • event header
  • event data

關於event header和event data 的格式在數據庫的不一樣版本略有不一樣,但共同的地方以下:

+=====================================+
| event  | timestamp         0 : 4    |
| header +----------------------------+
|        | type_code         4 : 1    |
|        +----------------------------+
|        | server_id         5 : 4    |
|        +----------------------------+
|        | event_length      9 : 4    |
|        +----------------------------+
|        |不一樣版本不同(省略)          |
+=====================================+
| event  | fixed part                 |
| data   +----------------------------+
|        | variable part              |
+=====================================+

這裏有個值得咱們注意的地方就是在binlog的header中有一個屬性是timestamp,這個屬性是標識了change發生的前後順序,在備庫進行復制時候會嚴格按照時間順序進行log的重放。

binlog的生成

咱們以對MySQL進行實際操做的方式,直觀的介紹一下binlog的生成,binlog是二進制存儲的,下面咱們會利用工具查看binlog的文本內容。

  • 查看一下binlog是否打開:
show variables like 'log_bin'-> ;
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)
  • 查看一下binlog的模式(我須要row-base模式):
show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)
  • 清除現有的binlog
MySQL> reset master;
Query OK, 0 rows affected (0.00 sec)建立一張咱們作實驗的表MySQL> create table tab(
    ->    id INT NOT NULL AUTO_INCREMENT,
    ->    user VARCHAR(100) NOT NULL,
    ->    clicks INT NOT NULL,
    ->    PRIMARY KEY (id)
    -> );
Query OK, 0 rows affected (0.10 sec)

MySQL> show tables;
+-------------------+
| Tables_in_Apache Flinkdb |
+-------------------+
| tab         |
+-------------------+
1 row in set (0.00 sec)
  • 進行DML操做
MySQL> insert into tab(user, clicks) values ('Mary', 1);
Query OK, 1 row affected (0.03 sec)

MySQL> insert into tab(user, clicks) values ('Bob', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update tab set clicks=2 where user='Mary'
    -> ;
Query OK, 1 row affected (0.06 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> insert into tab(user, clicks) values ('Llz', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update tab set clicks=2 where user='Bob';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> update tab set clicks=3 where user='Mary';
Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> select * from tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      3 |
|  2 | Bob  |      2 |
|  3 | Llz  |      1 |
+----+------+--------+
3 rows in set (0.00 sec)
  • 查看正在操做的binlog
MySQL> show master status\G
*************************** 1. row ***************************
             File: binlog.000001
         Position: 2547
     Binlog_Do_DB: 
 Binlog_Ignore_DB: 
Executed_Gtid_Set: 
1 row in set (0.00 sec)

上面 binlog.000001 文件是咱們正在操做的binlog。

  • 查看binlog.000001文件的操做記錄
MySQL> show binlog events in 'binlog.000001';
+---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Log_name      | Pos  | Event_type     | Server_id | End_log_pos | Info                                                                                                                                                                |
+---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| binlog.000001 |    4 | Format_desc    |         1 |         124 | Server ver: 8.0.11, Binlog ver: 4                                                                                                                                   |
| binlog.000001 |  124 | Previous_gtids |         1 |         155 |                                                                                                                                                                     |
| binlog.000001 |  155 | Anonymous_Gtid |         1 |         228 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 |  228 | Query          |         1 |         368 | use `Apache Flinkdb`; DROP TABLE `tab` /* generated by server */ /* xid=22 */                                                                                        |
| binlog.000001 |  368 | Anonymous_Gtid |         1 |         443 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 |  443 | Query          |         1 |         670 | use `Apache Flinkdb`; create table tab(
   id INT NOT NULL AUTO_INCREMENT,
   user VARCHAR(100) NOT NULL,
   clicks INT NOT NULL,
   PRIMARY KEY (id)
) /* xid=23 */ |
| binlog.000001 |  670 | Anonymous_Gtid |         1 |         745 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 |  745 | Query          |         1 |         823 | BEGIN                                                                                                                                                               |
| binlog.000001 |  823 | Table_map      |         1 |         890 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
| binlog.000001 |  890 | Write_rows     |         1 |         940 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
| binlog.000001 |  940 | Xid            |         1 |         971 | COMMIT /* xid=25 */                                                                                                                                                 |
| binlog.000001 |  971 | Anonymous_Gtid |         1 |        1046 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 | 1046 | Query          |         1 |        1124 | BEGIN                                                                                                                                                               |
| binlog.000001 | 1124 | Table_map      |         1 |        1191 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
| binlog.000001 | 1191 | Write_rows     |         1 |        1240 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
| binlog.000001 | 1240 | Xid            |         1 |        1271 | COMMIT /* xid=26 */                                                                                                                                                 |
| binlog.000001 | 1271 | Anonymous_Gtid |         1 |        1346 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 | 1346 | Query          |         1 |        1433 | BEGIN                                                                                                                                                               |
| binlog.000001 | 1433 | Table_map      |         1 |        1500 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
| binlog.000001 | 1500 | Update_rows    |         1 |        1566 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
| binlog.000001 | 1566 | Xid            |         1 |        1597 | COMMIT /* xid=27 */                                                                                                                                                 |
| binlog.000001 | 1597 | Anonymous_Gtid |         1 |        1672 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 | 1672 | Query          |         1 |        1750 | BEGIN                                                                                                                                                               |
| binlog.000001 | 1750 | Table_map      |         1 |        1817 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
| binlog.000001 | 1817 | Write_rows     |         1 |        1866 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
| binlog.000001 | 1866 | Xid            |         1 |        1897 | COMMIT /* xid=28 */                                                                                                                                                 |
| binlog.000001 | 1897 | Anonymous_Gtid |         1 |        1972 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 | 1972 | Query          |         1 |        2059 | BEGIN                                                                                                                                                               |
| binlog.000001 | 2059 | Table_map      |         1 |        2126 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
| binlog.000001 | 2126 | Update_rows    |         1 |        2190 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
| binlog.000001 | 2190 | Xid            |         1 |        2221 | COMMIT /* xid=29 */                                                                                                                                                 |
| binlog.000001 | 2221 | Anonymous_Gtid |         1 |        2296 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
| binlog.000001 | 2296 | Query          |         1 |        2383 | BEGIN                                                                                                                                                               |
| binlog.000001 | 2383 | Table_map      |         1 |        2450 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
| binlog.000001 | 2450 | Update_rows    |         1 |        2516 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
| binlog.000001 | 2516 | Xid            |         1 |        2547 | COMMIT /* xid=30 */                                                                                                                                                 |
+---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
36 rows in set (0.00 sec)

上面咱們進行了3次insert和3次update,那麼在binlog中咱們看到了三條Write_rows和三條Update_rows,而且在記錄順序和操做順序保持一致,接下來咱們看看Write_rows和Update_rows的具體timestamp和data明文。

  • 導出明文
sudo MySQLbinlog --start-datetime='2018-04-29 00:00:03' --stop-datetime='2018-05-02 00:30:00' --base64-output=decode-rows -v /usr/local/MySQL/data/binlog.000001 > ~/binlog.txt

打開binlog.txt 內容以下:

/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 4
#180430 22:29:33 server id 1  end_log_pos 124 CRC32 0xff61797c  Start: binlog v 4, server v 8.0.11 created 180430 22:29:33 at startup
# Warning: this binlog is either in use or was not closed properly.
ROLLBACK/*!*/;
# at 124
#180430 22:29:33 server id 1  end_log_pos 155 CRC32 0x629ae755  Previous-GTIDs
# [empty]
# at 155
#180430 22:32:11 server id 1  end_log_pos 228 CRC32 0xbde49fca  Anonymous_GTID  last_committed=0        sequence_number=1       rbr_only=no     original_committed_timestamp=1525098731207902   immediate_commit_timestamp=1525098731207902     transaction_length=213
# original_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
# immediate_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
/*!80001 SET @@session.original_commit_timestamp=1525098731207902*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 228
#180430 22:32:11 server id 1  end_log_pos 368 CRC32 0xe5f330e7  Query   thread_id=9     exec_time=0     error_code=0    Xid = 22
use `Apache Flinkdb`/*!*/;
SET TIMESTAMP=1525098731/*!*/;
SET @@session.pseudo_thread_id=9/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1168113696/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8mb4 *//*!*/;
SET @@session.character_set_client=255,@@session.collation_connection=255,@@session.collation_server=255/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
/*!80005 SET @@session.default_collation_for_utf8mb4=255*//*!*/;
DROP TABLE `tab` /* generated by server */
/*!*/;
# at 368
#180430 22:32:21 server id 1  end_log_pos 443 CRC32 0x50e5acb7  Anonymous_GTID  last_committed=1        sequence_number=2       rbr_only=no     original_committed_timestamp=1525098741628960   immediate_commit_timestamp=1525098741628960     transaction_length=302
# original_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
# immediate_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
/*!80001 SET @@session.original_commit_timestamp=1525098741628960*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 443
#180430 22:32:21 server id 1  end_log_pos 670 CRC32 0xe1353dd6  Query   thread_id=9     exec_time=0     error_code=0    Xid = 23
SET TIMESTAMP=1525098741/*!*/;
create table tab(
   id INT NOT NULL AUTO_INCREMENT,
   user VARCHAR(100) NOT NULL,
   clicks INT NOT NULL,
   PRIMARY KEY (id)
)
/*!*/;
# at 670
#180430 22:36:53 server id 1  end_log_pos 745 CRC32 0xcf436fbb  Anonymous_GTID  last_committed=2        sequence_number=3       rbr_only=yes    original_committed_timestamp=1525099013988373   immediate_commit_timestamp=1525099013988373     transaction_length=301
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
# immediate_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099013988373*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 745
#180430 22:36:53 server id 1  end_log_pos 823 CRC32 0x71c64dd2  Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099013/*!*/;
BEGIN
/*!*/;
# at 823
#180430 22:36:53 server id 1  end_log_pos 890 CRC32 0x63792f6b  Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 890
#180430 22:36:53 server id 1  end_log_pos 940 CRC32 0xf2dade22  Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=1
###   @2='Mary'
###   @3=1
# at 940
#180430 22:36:53 server id 1  end_log_pos 971 CRC32 0x7db3e61e  Xid = 25
COMMIT/*!*/;
# at 971
#180430 22:37:06 server id 1  end_log_pos 1046 CRC32 0xd05dd12c         Anonymous_GTID  last_committed=3        sequence_number=4       rbr_only=yes    original_committed_timestamp=1525099026328547   immediate_commit_timestamp=1525099026328547     transaction_length=300
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
# immediate_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099026328547*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1046
#180430 22:37:06 server id 1  end_log_pos 1124 CRC32 0x80f259e0         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099026/*!*/;
BEGIN
/*!*/;
# at 1124
#180430 22:37:06 server id 1  end_log_pos 1191 CRC32 0x255903ba         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1191
#180430 22:37:06 server id 1  end_log_pos 1240 CRC32 0xe76bfc79         Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=2
###   @2='Bob'
###   @3=1
# at 1240
#180430 22:37:06 server id 1  end_log_pos 1271 CRC32 0x83cddfef         Xid = 26
COMMIT/*!*/;
# at 1271
#180430 22:37:15 server id 1  end_log_pos 1346 CRC32 0x7095baee         Anonymous_GTID  last_committed=4        sequence_number=5       rbr_only=yes    original_committed_timestamp=1525099035811597   immediate_commit_timestamp=1525099035811597     transaction_length=326
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
# immediate_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099035811597*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1346
#180430 22:37:15 server id 1  end_log_pos 1433 CRC32 0x70ef97e2         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099035/*!*/;
BEGIN
/*!*/;
# at 1433
#180430 22:37:15 server id 1  end_log_pos 1500 CRC32 0x75f1f399         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1500
#180430 22:37:15 server id 1  end_log_pos 1566 CRC32 0x256bd4b8         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=1
###   @2='Mary'
###   @3=1
### SET
###   @1=1
###   @2='Mary'
###   @3=2
# at 1566
#180430 22:37:15 server id 1  end_log_pos 1597 CRC32 0x93c86579         Xid = 27
COMMIT/*!*/;
# at 1597
#180430 22:37:27 server id 1  end_log_pos 1672 CRC32 0xe8bd63e7         Anonymous_GTID  last_committed=5        sequence_number=6       rbr_only=yes    original_committed_timestamp=1525099047219517   immediate_commit_timestamp=1525099047219517     transaction_length=300
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
# immediate_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099047219517*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1672
#180430 22:37:27 server id 1  end_log_pos 1750 CRC32 0x5356c3c7         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099047/*!*/;
BEGIN
/*!*/;
# at 1750
#180430 22:37:27 server id 1  end_log_pos 1817 CRC32 0x37e6b1ce         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1817
#180430 22:37:27 server id 1  end_log_pos 1866 CRC32 0x6ab1bbe6         Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=3
###   @2='Llz'
###   @3=1
# at 1866
#180430 22:37:27 server id 1  end_log_pos 1897 CRC32 0x3b62b153         Xid = 28
COMMIT/*!*/;
# at 1897
#180430 22:37:36 server id 1  end_log_pos 1972 CRC32 0x603134c1         Anonymous_GTID  last_committed=6        sequence_number=7       rbr_only=yes    original_committed_timestamp=1525099056866022   immediate_commit_timestamp=1525099056866022     transaction_length=324
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
# immediate_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099056866022*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1972
#180430 22:37:36 server id 1  end_log_pos 2059 CRC32 0xe17df4e4         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099056/*!*/;
BEGIN
/*!*/;
# at 2059
#180430 22:37:36 server id 1  end_log_pos 2126 CRC32 0x53888b05         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 2126
#180430 22:37:36 server id 1  end_log_pos 2190 CRC32 0x85f34996         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=2
###   @2='Bob'
###   @3=1
### SET
###   @1=2
###   @2='Bob'
###   @3=2
# at 2190
#180430 22:37:36 server id 1  end_log_pos 2221 CRC32 0x877f1e23         Xid = 29
COMMIT/*!*/;
# at 2221
#180430 22:37:45 server id 1  end_log_pos 2296 CRC32 0xfbc7e868         Anonymous_GTID  last_committed=7        sequence_number=8       rbr_only=yes    original_committed_timestamp=1525099065089940   immediate_commit_timestamp=1525099065089940     transaction_length=326
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
# immediate_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099065089940*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 2296
#180430 22:37:45 server id 1  end_log_pos 2383 CRC32 0x8a514364         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099065/*!*/;
BEGIN
/*!*/;
# at 2383
#180430 22:37:45 server id 1  end_log_pos 2450 CRC32 0xdf18ca60         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 2450
#180430 22:37:45 server id 1  end_log_pos 2516 CRC32 0xd50de69f         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=1
###   @2='Mary'
###   @3=2
### SET
###   @1=1
###   @2='Mary'
###   @3=3
# at 2516
#180430 22:37:45 server id 1  end_log_pos 2547 CRC32 0x94f89393         Xid = 30
COMMIT/*!*/;
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by MySQLbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
  • 梳理操做和binlog的記錄關係
    DML binlog-header(timestamp) data
    insert into blink_tab(user, clicks) values ('Mary', 1);

    1525099013

    (2018/4/30 22:36:53)

    ## INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=1
    insert into blink_tab(user, clicks) values ('Bob', 1);

    1525099026

    (2018/4/30 22:37:06)

    ### INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=2
    ### @2='Bob'
    ### @3=1 
     update blink_tab set clicks=2 where user='Mary';

     1525099035

    (2018/4/30 22:37:15)

     ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=1
    ### @2='Mary'
    ### @3=1
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=2
     insert into blink_tab(user, clicks) values ('Llz', 1);

    1525099047

    (2018/4/30 22:37:27) 

    ### INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=3
    ### @2='Llz'
    ### @3=1 
     update blink_tab set clicks=2 where user='Bob';

     1525099056

    (2018/4/30 22:37:36)

    ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=2
    ### @2='Bob'
    ### @3=1
    ### SET
    ### @1=2
    ### @2='Bob'
    ### @3=2 
     update blink_tab set clicks=3 where user='Mary';

    1525099065

    (2018/4/30 22:37:45) 

    ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=1
    ### @2='Mary'
    ### @3=2
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=3 
  • 簡化一下binlog
timestamp user clicks
1525099013 Mary 1
1525099026 Bob 1
1525099035 Mary 2
1525099047 LIz 1
1525099056 Bob 2
1525099065 Mary 3
  • replay binlog會獲得以下表數據(按timestamp順序)
user clicks
Mary 3
Bob 2
LIz 1
  • 表與binlog的關係簡單示意以下

流表對偶(duality)性

前面我花費了一些時間介紹了MySQL主備複製機制和binlog的數據格式,binlog中攜帶時間戳,咱們將全部表的操做都按時間進行記錄下來造成binlog,而對binlog的event進行重放的過程就是流數據處理的過程,重放的結果偏偏又造成了一張表。也就是表的操做會造成攜帶時間的事件流,對流的處理又會造成一張不斷變化的表,表和流具備等價性,能夠互轉。隨着時間推移,DML操做不斷進行,那麼表的內容也不斷變化,具體以下:

如上圖所示內容,流和表具有相同的特徵:

  • 表 - Schema,Data,DML操做時間
  • 流 - Schema,Data, Data處理時間

咱們發現,雖然大多數表上面沒有明確的顯示出DML操做時間,但本質上數據庫系統裏面是有數據操做時間信息的,這個和流上數據的處理時間(processing time)/產生時間(event-time)相對應。流與表具有相同的特徵,能夠信息無損的相互轉換,我稱之爲流表對偶(duality)性。

上面咱們描述的表,在流上稱之爲動態表(Dynamic Table),緣由是在流上面任何一個事件的到來都是對錶上數據的一次更新(包括插入和刪除),表的內容是不斷的變化的,任何一個時刻流的狀態和表的快照一一對應。流與動態表(Dynamic Table)在時間維度上面具備等價性,這種等價性咱們稱之爲流和動態表(Dynamic Table)的對偶(duality)性。

小結

本篇主要介紹Apache Flink做爲一個流計算平臺爲何能夠爲用戶提供SQL API。其根本緣由是若是將流上的數據看作是結構化的數據,流任務的核心是將一個具備時間屬性的結構化數據變成一樣具備時間屬性的另外一個結構化數據,而表的數據變化過程binlog偏偏就是一份具備時間屬性的流數據,流與表具備信息無損的相互轉換的特性,這種流表對偶性也決定了Apache Flink能夠採用SQL做爲流任務的開發語言。

原文連接

相關文章
相關標籤/搜索