摘要: 實際問題 不少大數據計算產品,都對用戶提供了SQL API,好比Hive, Spark, Flink等,那麼SQL做爲傳統關係數據庫的查詢語言,是應用在批查詢場景的。Hive和Spark本質上都是Batch的計算模式(在《Apache Flink 漫談系列 - 概述》咱們介紹過Spark是Micr...html
不少大數據計算產品,都對用戶提供了SQL API,好比Hive, Spark, Flink等,那麼SQL做爲傳統關係數據庫的查詢語言,是應用在批查詢場景的。Hive和Spark本質上都是Batch的計算模式(在《Apache Flink 漫談系列 - 概述》咱們介紹過Spark是Micro Batching模式),提供SQL API很容易被人理解,可是Flink是純流(Native Streaming)的計算模式, 流與批在數據集和計算過程上有很大的區別,以下:mysql
咱們發現批與流的查詢場景在數據集合和計算過程上都有很大的不一樣,那麼基於Native Streaming模式的Apache Flink爲啥也能爲用戶提供SQL API呢?sql
咱們知道SQL都是做用於關係表的,在傳統數據庫中進行查詢時候,SQL所要查詢的表在觸發查詢時候數據是不會變化的,也就是說在查詢那一刻,表是一張靜態表,至關因而一個有限的批數據,這樣也說明SQL是源於對批計算的查詢的,那麼要回答Apache Flink爲啥也能爲用戶提供SQL API,咱們首先要理解流與批在語義層面的關係。咱們以一個具體示例說明,以下圖:數據庫
上圖展示的是一個攜帶時間戳和用戶名的點擊事件流,咱們先對這些事件流進行流式統計,同時在最後的流事件上觸發批計算。流計算中每接收一個數據都會觸發一次計算,咱們以2018/4/30 22:37:45 Mary到來那一時間切片看,不管是在流仍是批上計算結果都是6。也就是說在相同的數據源,相同的查詢邏輯下,流和批的計算結果是相同的。相同的SQL在流和批這兩種模式下,最終結果是一致的,那麼流與批在語義上是徹底相同的。session
流與批在語義上是一致的,SQL是做用於表的,那麼要回答Apache Flink爲啥也能爲用戶提供SQL API的問題,就變成了流與表是否具備等價性,也就是本篇要重點介紹的爲何流表具備對偶(duality)性?以下圖所示,一張表能夠看作爲流嗎?一樣流能夠看作是一張表嗎?若是能夠須要怎樣的條件和變通?app
在介紹流與表的關係以前咱們先聊聊MySQL的主備複製,binlog是MySQL實現主備複製的核心手段,簡單來講MySQL主備複製實現分紅三個步驟:函數
具體以下圖所示:工具
接下來咱們從binlog模式,binlog格式以及經過查看binlog的具體內容來詳盡介紹binlog與表的關係。大數據
上面介紹的MySQL主備複製的核心手段是利用binlog實現的,那邊binlog會記錄那些內容呢?binlog記錄了數據庫全部的增、刪、更新等操做。MySQL支持三種方式記錄binlog:this
mixed-base logging - 該模式默認是statement-based,當遇到以下狀況會自動切換到row-based:
咱們以row-based 模式爲例介紹一下binlog的存儲格式 ,全部的 binary log events都是字節序列,由兩部分組成:
關於event header和event data 的格式在數據庫的不一樣版本略有不一樣,但共同的地方以下:
+=====================================+ | event | timestamp 0 : 4 | | header +----------------------------+ | | type_code 4 : 1 | | +----------------------------+ | | server_id 5 : 4 | | +----------------------------+ | | event_length 9 : 4 | | +----------------------------+ | |不一樣版本不同(省略) | +=====================================+ | event | fixed part | | data +----------------------------+ | | variable part | +=====================================+
這裏有個值得咱們注意的地方就是在binlog的header中有一個屬性是timestamp,這個屬性是標識了change發生的前後順序,在備庫進行復制時候會嚴格按照時間順序進行log的重放。
咱們以對MySQL進行實際操做的方式,直觀的介紹一下binlog的生成,binlog是二進制存儲的,下面咱們會利用工具查看binlog的文本內容。
show variables like 'log_bin'-> ; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.00 sec)
show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.00 sec)
MySQL> reset master; Query OK, 0 rows affected (0.00 sec)建立一張咱們作實驗的表MySQL> create table tab( -> id INT NOT NULL AUTO_INCREMENT, -> user VARCHAR(100) NOT NULL, -> clicks INT NOT NULL, -> PRIMARY KEY (id) -> ); Query OK, 0 rows affected (0.10 sec) MySQL> show tables; +-------------------+ | Tables_in_Apache Flinkdb | +-------------------+ | tab | +-------------------+ 1 row in set (0.00 sec)
MySQL> insert into tab(user, clicks) values ('Mary', 1); Query OK, 1 row affected (0.03 sec) MySQL> insert into tab(user, clicks) values ('Bob', 1); Query OK, 1 row affected (0.08 sec) MySQL> update tab set clicks=2 where user='Mary' -> ; Query OK, 1 row affected (0.06 sec) Rows matched: 1 Changed: 1 Warnings: 0 MySQL> insert into tab(user, clicks) values ('Llz', 1); Query OK, 1 row affected (0.08 sec) MySQL> update tab set clicks=2 where user='Bob'; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 MySQL> update tab set clicks=3 where user='Mary'; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0 MySQL> select * from tab; +----+------+--------+ | id | user | clicks | +----+------+--------+ | 1 | Mary | 3 | | 2 | Bob | 2 | | 3 | Llz | 1 | +----+------+--------+ 3 rows in set (0.00 sec)
MySQL> show master status\G *************************** 1. row *************************** File: binlog.000001 Position: 2547 Binlog_Do_DB: Binlog_Ignore_DB: Executed_Gtid_Set: 1 row in set (0.00 sec)
上面 binlog.000001 文件是咱們正在操做的binlog。
MySQL> show binlog events in 'binlog.000001'; +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Log_name | Pos | Event_type | Server_id | End_log_pos | Info | +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | binlog.000001 | 4 | Format_desc | 1 | 124 | Server ver: 8.0.11, Binlog ver: 4 | | binlog.000001 | 124 | Previous_gtids | 1 | 155 | | | binlog.000001 | 155 | Anonymous_Gtid | 1 | 228 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 228 | Query | 1 | 368 | use `Apache Flinkdb`; DROP TABLE `tab` /* generated by server */ /* xid=22 */ | | binlog.000001 | 368 | Anonymous_Gtid | 1 | 443 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 443 | Query | 1 | 670 | use `Apache Flinkdb`; create table tab( id INT NOT NULL AUTO_INCREMENT, user VARCHAR(100) NOT NULL, clicks INT NOT NULL, PRIMARY KEY (id) ) /* xid=23 */ | | binlog.000001 | 670 | Anonymous_Gtid | 1 | 745 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 745 | Query | 1 | 823 | BEGIN | | binlog.000001 | 823 | Table_map | 1 | 890 | table_id: 96 (Apache Flinkdb.tab) | | binlog.000001 | 890 | Write_rows | 1 | 940 | table_id: 96 flags: STMT_END_F | | binlog.000001 | 940 | Xid | 1 | 971 | COMMIT /* xid=25 */ | | binlog.000001 | 971 | Anonymous_Gtid | 1 | 1046 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 1046 | Query | 1 | 1124 | BEGIN | | binlog.000001 | 1124 | Table_map | 1 | 1191 | table_id: 96 (Apache Flinkdb.tab) | | binlog.000001 | 1191 | Write_rows | 1 | 1240 | table_id: 96 flags: STMT_END_F | | binlog.000001 | 1240 | Xid | 1 | 1271 | COMMIT /* xid=26 */ | | binlog.000001 | 1271 | Anonymous_Gtid | 1 | 1346 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 1346 | Query | 1 | 1433 | BEGIN | | binlog.000001 | 1433 | Table_map | 1 | 1500 | table_id: 96 (Apache Flinkdb.tab) | | binlog.000001 | 1500 | Update_rows | 1 | 1566 | table_id: 96 flags: STMT_END_F | | binlog.000001 | 1566 | Xid | 1 | 1597 | COMMIT /* xid=27 */ | | binlog.000001 | 1597 | Anonymous_Gtid | 1 | 1672 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 1672 | Query | 1 | 1750 | BEGIN | | binlog.000001 | 1750 | Table_map | 1 | 1817 | table_id: 96 (Apache Flinkdb.tab) | | binlog.000001 | 1817 | Write_rows | 1 | 1866 | table_id: 96 flags: STMT_END_F | | binlog.000001 | 1866 | Xid | 1 | 1897 | COMMIT /* xid=28 */ | | binlog.000001 | 1897 | Anonymous_Gtid | 1 | 1972 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 1972 | Query | 1 | 2059 | BEGIN | | binlog.000001 | 2059 | Table_map | 1 | 2126 | table_id: 96 (Apache Flinkdb.tab) | | binlog.000001 | 2126 | Update_rows | 1 | 2190 | table_id: 96 flags: STMT_END_F | | binlog.000001 | 2190 | Xid | 1 | 2221 | COMMIT /* xid=29 */ | | binlog.000001 | 2221 | Anonymous_Gtid | 1 | 2296 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | binlog.000001 | 2296 | Query | 1 | 2383 | BEGIN | | binlog.000001 | 2383 | Table_map | 1 | 2450 | table_id: 96 (Apache Flinkdb.tab) | | binlog.000001 | 2450 | Update_rows | 1 | 2516 | table_id: 96 flags: STMT_END_F | | binlog.000001 | 2516 | Xid | 1 | 2547 | COMMIT /* xid=30 */ | +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 36 rows in set (0.00 sec)
上面咱們進行了3次insert和3次update,那麼在binlog中咱們看到了三條Write_rows和三條Update_rows,而且在記錄順序和操做順序保持一致,接下來咱們看看Write_rows和Update_rows的具體timestamp和data明文。
sudo MySQLbinlog --start-datetime='2018-04-29 00:00:03' --stop-datetime='2018-05-02 00:30:00' --base64-output=decode-rows -v /usr/local/MySQL/data/binlog.000001 > ~/binlog.txt
打開binlog.txt 內容以下:
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; /*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; DELIMITER /*!*/; # at 4 #180430 22:29:33 server id 1 end_log_pos 124 CRC32 0xff61797c Start: binlog v 4, server v 8.0.11 created 180430 22:29:33 at startup # Warning: this binlog is either in use or was not closed properly. ROLLBACK/*!*/; # at 124 #180430 22:29:33 server id 1 end_log_pos 155 CRC32 0x629ae755 Previous-GTIDs # [empty] # at 155 #180430 22:32:11 server id 1 end_log_pos 228 CRC32 0xbde49fca Anonymous_GTID last_committed=0 sequence_number=1 rbr_only=no original_committed_timestamp=1525098731207902 immediate_commit_timestamp=1525098731207902 transaction_length=213 # original_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST) # immediate_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST) /*!80001 SET @@session.original_commit_timestamp=1525098731207902*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 228 #180430 22:32:11 server id 1 end_log_pos 368 CRC32 0xe5f330e7 Query thread_id=9 exec_time=0 error_code=0 Xid = 22 use `Apache Flinkdb`/*!*/; SET TIMESTAMP=1525098731/*!*/; SET @@session.pseudo_thread_id=9/*!*/; SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/; SET @@session.sql_mode=1168113696/*!*/; SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; /*!\C utf8mb4 *//*!*/; SET @@session.character_set_client=255,@@session.collation_connection=255,@@session.collation_server=255/*!*/; SET @@session.lc_time_names=0/*!*/; SET @@session.collation_database=DEFAULT/*!*/; /*!80005 SET @@session.default_collation_for_utf8mb4=255*//*!*/; DROP TABLE `tab` /* generated by server */ /*!*/; # at 368 #180430 22:32:21 server id 1 end_log_pos 443 CRC32 0x50e5acb7 Anonymous_GTID last_committed=1 sequence_number=2 rbr_only=no original_committed_timestamp=1525098741628960 immediate_commit_timestamp=1525098741628960 transaction_length=302 # original_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST) # immediate_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST) /*!80001 SET @@session.original_commit_timestamp=1525098741628960*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 443 #180430 22:32:21 server id 1 end_log_pos 670 CRC32 0xe1353dd6 Query thread_id=9 exec_time=0 error_code=0 Xid = 23 SET TIMESTAMP=1525098741/*!*/; create table tab( id INT NOT NULL AUTO_INCREMENT, user VARCHAR(100) NOT NULL, clicks INT NOT NULL, PRIMARY KEY (id) ) /*!*/; # at 670 #180430 22:36:53 server id 1 end_log_pos 745 CRC32 0xcf436fbb Anonymous_GTID last_committed=2 sequence_number=3 rbr_only=yes original_committed_timestamp=1525099013988373 immediate_commit_timestamp=1525099013988373 transaction_length=301 /*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; # original_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST) # immediate_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST) /*!80001 SET @@session.original_commit_timestamp=1525099013988373*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 745 #180430 22:36:53 server id 1 end_log_pos 823 CRC32 0x71c64dd2 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099013/*!*/; BEGIN /*!*/; # at 823 #180430 22:36:53 server id 1 end_log_pos 890 CRC32 0x63792f6b Table_map: `Apache Flinkdb`.`tab` mapped to number 96 # at 890 #180430 22:36:53 server id 1 end_log_pos 940 CRC32 0xf2dade22 Write_rows: table id 96 flags: STMT_END_F ### INSERT INTO `Apache Flinkdb`.`tab` ### SET ### @1=1 ### @2='Mary' ### @3=1 # at 940 #180430 22:36:53 server id 1 end_log_pos 971 CRC32 0x7db3e61e Xid = 25 COMMIT/*!*/; # at 971 #180430 22:37:06 server id 1 end_log_pos 1046 CRC32 0xd05dd12c Anonymous_GTID last_committed=3 sequence_number=4 rbr_only=yes original_committed_timestamp=1525099026328547 immediate_commit_timestamp=1525099026328547 transaction_length=300 /*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; # original_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST) # immediate_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST) /*!80001 SET @@session.original_commit_timestamp=1525099026328547*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 1046 #180430 22:37:06 server id 1 end_log_pos 1124 CRC32 0x80f259e0 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099026/*!*/; BEGIN /*!*/; # at 1124 #180430 22:37:06 server id 1 end_log_pos 1191 CRC32 0x255903ba Table_map: `Apache Flinkdb`.`tab` mapped to number 96 # at 1191 #180430 22:37:06 server id 1 end_log_pos 1240 CRC32 0xe76bfc79 Write_rows: table id 96 flags: STMT_END_F ### INSERT INTO `Apache Flinkdb`.`tab` ### SET ### @1=2 ### @2='Bob' ### @3=1 # at 1240 #180430 22:37:06 server id 1 end_log_pos 1271 CRC32 0x83cddfef Xid = 26 COMMIT/*!*/; # at 1271 #180430 22:37:15 server id 1 end_log_pos 1346 CRC32 0x7095baee Anonymous_GTID last_committed=4 sequence_number=5 rbr_only=yes original_committed_timestamp=1525099035811597 immediate_commit_timestamp=1525099035811597 transaction_length=326 /*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; # original_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST) # immediate_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST) /*!80001 SET @@session.original_commit_timestamp=1525099035811597*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 1346 #180430 22:37:15 server id 1 end_log_pos 1433 CRC32 0x70ef97e2 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099035/*!*/; BEGIN /*!*/; # at 1433 #180430 22:37:15 server id 1 end_log_pos 1500 CRC32 0x75f1f399 Table_map: `Apache Flinkdb`.`tab` mapped to number 96 # at 1500 #180430 22:37:15 server id 1 end_log_pos 1566 CRC32 0x256bd4b8 Update_rows: table id 96 flags: STMT_END_F ### UPDATE `Apache Flinkdb`.`tab` ### WHERE ### @1=1 ### @2='Mary' ### @3=1 ### SET ### @1=1 ### @2='Mary' ### @3=2 # at 1566 #180430 22:37:15 server id 1 end_log_pos 1597 CRC32 0x93c86579 Xid = 27 COMMIT/*!*/; # at 1597 #180430 22:37:27 server id 1 end_log_pos 1672 CRC32 0xe8bd63e7 Anonymous_GTID last_committed=5 sequence_number=6 rbr_only=yes original_committed_timestamp=1525099047219517 immediate_commit_timestamp=1525099047219517 transaction_length=300 /*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; # original_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST) # immediate_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST) /*!80001 SET @@session.original_commit_timestamp=1525099047219517*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 1672 #180430 22:37:27 server id 1 end_log_pos 1750 CRC32 0x5356c3c7 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099047/*!*/; BEGIN /*!*/; # at 1750 #180430 22:37:27 server id 1 end_log_pos 1817 CRC32 0x37e6b1ce Table_map: `Apache Flinkdb`.`tab` mapped to number 96 # at 1817 #180430 22:37:27 server id 1 end_log_pos 1866 CRC32 0x6ab1bbe6 Write_rows: table id 96 flags: STMT_END_F ### INSERT INTO `Apache Flinkdb`.`tab` ### SET ### @1=3 ### @2='Llz' ### @3=1 # at 1866 #180430 22:37:27 server id 1 end_log_pos 1897 CRC32 0x3b62b153 Xid = 28 COMMIT/*!*/; # at 1897 #180430 22:37:36 server id 1 end_log_pos 1972 CRC32 0x603134c1 Anonymous_GTID last_committed=6 sequence_number=7 rbr_only=yes original_committed_timestamp=1525099056866022 immediate_commit_timestamp=1525099056866022 transaction_length=324 /*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; # original_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST) # immediate_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST) /*!80001 SET @@session.original_commit_timestamp=1525099056866022*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 1972 #180430 22:37:36 server id 1 end_log_pos 2059 CRC32 0xe17df4e4 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099056/*!*/; BEGIN /*!*/; # at 2059 #180430 22:37:36 server id 1 end_log_pos 2126 CRC32 0x53888b05 Table_map: `Apache Flinkdb`.`tab` mapped to number 96 # at 2126 #180430 22:37:36 server id 1 end_log_pos 2190 CRC32 0x85f34996 Update_rows: table id 96 flags: STMT_END_F ### UPDATE `Apache Flinkdb`.`tab` ### WHERE ### @1=2 ### @2='Bob' ### @3=1 ### SET ### @1=2 ### @2='Bob' ### @3=2 # at 2190 #180430 22:37:36 server id 1 end_log_pos 2221 CRC32 0x877f1e23 Xid = 29 COMMIT/*!*/; # at 2221 #180430 22:37:45 server id 1 end_log_pos 2296 CRC32 0xfbc7e868 Anonymous_GTID last_committed=7 sequence_number=8 rbr_only=yes original_committed_timestamp=1525099065089940 immediate_commit_timestamp=1525099065089940 transaction_length=326 /*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/; # original_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST) # immediate_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST) /*!80001 SET @@session.original_commit_timestamp=1525099065089940*//*!*/; SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/; # at 2296 #180430 22:37:45 server id 1 end_log_pos 2383 CRC32 0x8a514364 Query thread_id=9 exec_time=0 error_code=0 SET TIMESTAMP=1525099065/*!*/; BEGIN /*!*/; # at 2383 #180430 22:37:45 server id 1 end_log_pos 2450 CRC32 0xdf18ca60 Table_map: `Apache Flinkdb`.`tab` mapped to number 96 # at 2450 #180430 22:37:45 server id 1 end_log_pos 2516 CRC32 0xd50de69f Update_rows: table id 96 flags: STMT_END_F ### UPDATE `Apache Flinkdb`.`tab` ### WHERE ### @1=1 ### @2='Mary' ### @3=2 ### SET ### @1=1 ### @2='Mary' ### @3=3 # at 2516 #180430 22:37:45 server id 1 end_log_pos 2547 CRC32 0x94f89393 Xid = 30 COMMIT/*!*/; SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by MySQLbinlog */ /*!*/; DELIMITER ; # End of log file /*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; /*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
DML | binlog-header(timestamp) | data |
insert into blink_tab(user, clicks) values ('Mary', 1); | 1525099013 (2018/4/30 22:36:53) |
## INSERT INTO blinkdb .blink_tab ### SET ### @1=1 ### @2='Mary' ### @3=1 |
insert into blink_tab(user, clicks) values ('Bob', 1); | 1525099026 (2018/4/30 22:37:06) |
### INSERT INTO blinkdb .blink_tab ### SET ### @1=2 ### @2='Bob' ### @3=1 |
update blink_tab set clicks=2 where user='Mary'; | 1525099035 (2018/4/30 22:37:15) |
### UPDATE blinkdb .blink_tab ### WHERE ### @1=1 ### @2='Mary' ### @3=1 ### SET ### @1=1 ### @2='Mary' ### @3=2 |
insert into blink_tab(user, clicks) values ('Llz', 1); | 1525099047 (2018/4/30 22:37:27) |
### INSERT INTO blinkdb .blink_tab ### SET ### @1=3 ### @2='Llz' ### @3=1 |
update blink_tab set clicks=2 where user='Bob'; | 1525099056 (2018/4/30 22:37:36) |
### UPDATE blinkdb .blink_tab ### WHERE ### @1=2 ### @2='Bob' ### @3=1 ### SET ### @1=2 ### @2='Bob' ### @3=2 |
update blink_tab set clicks=3 where user='Mary'; | 1525099065 (2018/4/30 22:37:45) |
### UPDATE blinkdb .blink_tab ### WHERE ### @1=1 ### @2='Mary' ### @3=2 ### SET ### @1=1 ### @2='Mary' ### @3=3 |
timestamp | user | clicks |
---|---|---|
1525099013 | Mary | 1 |
1525099026 | Bob | 1 |
1525099035 | Mary | 2 |
1525099047 | LIz | 1 |
1525099056 | Bob | 2 |
1525099065 | Mary | 3 |
user | clicks |
---|---|
Mary | 3 |
Bob | 2 |
LIz | 1 |
前面我花費了一些時間介紹了MySQL主備複製機制和binlog的數據格式,binlog中攜帶時間戳,咱們將全部表的操做都按時間進行記錄下來造成binlog,而對binlog的event進行重放的過程就是流數據處理的過程,重放的結果偏偏又造成了一張表。也就是表的操做會造成攜帶時間的事件流,對流的處理又會造成一張不斷變化的表,表和流具備等價性,能夠互轉。隨着時間推移,DML操做不斷進行,那麼表的內容也不斷變化,具體以下:
如上圖所示內容,流和表具有相同的特徵:
咱們發現,雖然大多數表上面沒有明確的顯示出DML操做時間,但本質上數據庫系統裏面是有數據操做時間信息的,這個和流上數據的處理時間(processing time)/產生時間(event-time)相對應。流與表具有相同的特徵,能夠信息無損的相互轉換,我稱之爲流表對偶(duality)性。
上面咱們描述的表,在流上稱之爲動態表(Dynamic Table),緣由是在流上面任何一個事件的到來都是對錶上數據的一次更新(包括插入和刪除),表的內容是不斷的變化的,任何一個時刻流的狀態和表的快照一一對應。流與動態表(Dynamic Table)在時間維度上面具備等價性,這種等價性咱們稱之爲流和動態表(Dynamic Table)的對偶(duality)性。
本篇主要介紹Apache Flink做爲一個流計算平臺爲何能夠爲用戶提供SQL API。其根本緣由是若是將流上的數據看作是結構化的數據,流任務的核心是將一個具備時間屬性的結構化數據變成一樣具備時間屬性的另外一個結構化數據,而表的數據變化過程binlog偏偏就是一份具備時間屬性的流數據,流與表具備信息無損的相互轉換的特性,這種流表對偶性也決定了Apache Flink能夠採用SQL做爲流任務的開發語言。