Flink 1.11 最重要的 Feature —— Hive Streaming 以前已經和你們分享過了,今天就和你們來聊一聊另外一個特別重要的功能 —— CDC。git
何爲CDC?Change Data Capture,將數據庫中的’增’、’改’、’刪’操做記錄下來。在很早以前是經過觸發器來完成記錄,如今經過 binlog+同步中間件來實現。經常使用的 binlog 同步中間件有不少,好比 Alibaba 開源的 canal[1],Red Hat 開源的debezium[2],Zendesk 開源的 Maxwell[3] 等等。github
這些中間件會負責 binlog 的解析,並同步到消息中間件中,咱們只須要消費對應的 Topic 便可。數據庫
回到 Flink 上,CDC 彷佛和咱們沒有太大的關聯?其實否則,讓咱們更加抽象地來看這個世界。apache
當咱們用 Flink 去消費數據好比 Kafka 時,咱們就彷彿在讀一張表,什麼表?一張不斷有記錄被插入的表,咱們將每一條被插入的數據取出來,完成咱們的邏輯。json
當插入的每條數據都沒有問題時,一切都很美好。關聯、聚合、輸出。bootstrap
但當咱們發現,某條已經被計算過的數據有問題時,麻煩大了。咱們直接改最後的輸出值實際上是沒有用的,此次改了,當再來數據觸發計算時,結果仍是會被錯誤的數據覆蓋,由於中間計算結果沒有被修改,它仍然是一個錯誤的值。怎麼辦?撤回流彷佛能解決這個問題,這也確實是解決這個問題的手段,可是問題來了,撤回流怎麼肯定讀取的數據是要被撤回的?另外,怎麼去觸發一次撤回?數組
CDC 解決了這些:將消息中間件的數據反序列化後,根據 Type 來識別數據是 Insert 仍是 Delete;另外,若是你們看過 Flink 源碼,會發現反序列化後的數據類型變了,從 Row 升級爲 RowData,RowData 可以將數據標記爲撤回仍是插入,這就意味着每一個算子可以判斷出數據究竟是須要下發仍是撤回。工具
CDC 的重要性就先說這麼多,以後有機會的話,出一篇實時 DQC 的視頻,告訴你們 CDC 的出現,對於實時 DQC 的幫助有多大。下面讓咱們回到正題。spa
既然有那麼多 CDC 同步中間件,那麼必定會有各類各樣的格式存放在消息中間件中,咱們必然須要去解析它們。因而 Flink 1.11 提供了 canal-json 和 debezium-json,但咱們用的是 Maxwell 怎麼辦?只能等官方出或者說是等有人向社區貢獻嗎?那若是咱們用的是自研的同步中間件怎麼辦?3d
因此就有了今天的分享:如何去自定義實現一個 Maxwell format。你們也能夠基於此文的思路去實現其餘 CDC format,好比 OGG, 或是自研 CDC 工具產生的數據格式。
當咱們提交任務以後,Flink 會經過 SPI 機制將 classpath 下注冊的全部工廠類加載進來,包括 DynamicTableFactory、DeserializationFormatFactory 等等。而對於 Format 來講,到底使用哪一個 DeserializationFormatFactory,是根據 DDL 語句中的 Format 來決定的。經過將 Format 的值與工廠類的 factoryIdentifier() 方法的返回值進行匹配 來肯定。
再經過 DeserializationFormatFactory 中的 createDecodingFormat(...) 方法,將反序列化對象提供給 DynamicTableSource。
經過圖來了解整個過程(僅從反序列化數據並消費的角度來看):
想要實現 CDC Format 去解析某種 CDC 工具產生的數據其實很簡單,核心組件其實就三個:
再經過代碼,來看看反序列化中的細節:
public void deserialize(byte[] message, Collectorout) throws IOException { try { RowData row = jsonDeserializer.deserialize(message); String type = row.getString(2).toString(); // "type" field if (OP_INSERT.equals(type)) { RowData insert = row.getRow(0, fieldCount); insert.setRowKind(RowKind.INSERT); out.collect(insert); } else if (OP_UPDATE.equals(type)) { GenericRowData after = (GenericRowData) row.getRow(0, fieldCount); // "data" field GenericRowData before = (GenericRowData) row.getRow(1, fieldCount); // "old" field for (int f = 0; f < fieldCount; f++) { if (before.isNullAt(f)) { before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); out.collect(before); out.collect(after); } else if (OP_DELETE.equals(type)) { RowData delete = row.getRow(0, fieldCount); delete.setRowKind(RowKind.DELETE); out.collect(delete); } else { if (!ignoreParseErrors) { throw new IOException(format( "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", type, new String(message))); } } } catch (Throwable t) { if (!ignoreParseErrors) { throw new IOException(format( "Corrupt Maxwell JSON message '%s'.", new String(message)), t); } } }
其實並不複雜:先經過 jsonDeserializer 將字節數組根據 [data: ROW, old: ROW, type: String] 的 schema 反序列化成 RowData,而後根據 「type」 列的值來判斷數據是什麼類型:增、改、刪;再根據數據類型取出 「data」 或者 「old」 區的數據,來組裝成 Flink 認識的 INSERT/DELETE/UPDATE 數據並下發。
對象 jsonDeserializer 即 JSON 格式的反序列化器,它能夠經過指定的 RowType 類型,讀取 JSON 的字節數組中指定的字段並反序列化成 RowData。在咱們的場景中,咱們須要去讀取以下 Maxwell 數據的 「data」, 「old」 和 「type」 部分的數據。
{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}
所以 MaxwellJsonDeserializationSchema 中定義的 JSON 的 RowType 以下所示。
private RowType createJsonRowType(DataType databaseSchema) { // Maxwell JSON contains other information, e.g. "database", "ts" // but we don't need them return (RowType) DataTypes.ROW( DataTypes.FIELD("data", databaseSchema), DataTypes.FIELD("old", databaseSchema), DataTypes.FIELD("type", DataTypes.STRING())).getLogicalType(); }
databaseSchema 是用戶經過 DDL 定義的 schema 信息,也對應着數據庫中表的 schema。結合上面的 JSON 和代碼,咱們可以得知 jsonDeserializer 只會取走 byte[] 中 data、old、type 這三個字段對應的值,其中 data 和old 仍是個嵌套JSON,它們的 schema 信息和 databaseSchema 一致。因爲 Maxwell 在同步數據時,「old」區不包含未被更新的字段,因此 jsonDeserializer 返回後,咱們會經過 「data」 區的 RowData 將 old 區的缺失字段補齊。
獲得 RowData 以後,會取出 type 字段,而後根據對應的值,會有三種分支:
處理的過程當中,若是拋出異常,會根據 DDL 中maxwell-json.ignore-parse-errors的值來肯定是忽視這條數據繼續處理下一條數據,仍是讓任務報錯。
筆者在 maxwell-json 反序列化功能的基礎之上,還實現了序列化的功能,即能將 Flink 產生的 changelog 以 Maxwell 的 JSON 格式輸出到外部系統中。其實現思路與反序列化器的思路正好相反,更多細節能夠參考 Pull Request 中的實現。
PR 實現詳情連接:
https://github.com/apache/fli...
給你們演示一下從 Kafka 中讀取 Maxwell 推送來的 maxwell json 格式數據,並將聚合後的數據再次寫入 Kafka 後,從新讀出來驗證數據是否正確。
CREATE TABLE topic_products ( -- schema is totally the same to the MySQL "products" table id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'maxwell', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'maxwell-json');
CREATE TABLE topic_sink ( name STRING, sum_weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'maxwell-sink', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'maxwell-json' );
-- 注意,這部分 SQL 在 MySQL 中執行,不是 Flink 中的表 CREATE TABLE product ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255), description VARCHAR(512), weight FLOAT ); truncate product ; ALTER TABLE product AUTO_INCREMENT = 101; INSERT INTO product VALUES (default,"scooter","Small 2-wheel scooter",3.14), (default,"car battery","12V car battery",8.1), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), (default,"hammer","12oz carpenter's hammer",0.75), (default,"hammer","14oz carpenter's hammer",0.875), (default,"hammer","16oz carpenter's hammer",1.0), (default,"rocks","box of assorted rocks",5.3), (default,"jacket","water resistent black wind breaker",0.1), (default,"spare tire","24 inch spare tire",22.2); UPDATE product SET description='18oz carpenter hammer' WHERE id=106; UPDATE product SET weight='5.1' WHERE id=107; INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2); INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18); UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110; UPDATE product SET weight='5.17' WHERE id=111; DELETE FROM product WHERE id=111; UPDATE product SET weight='5.17' WHERE id=102 or id = 101; DELETE FROM product WHERE id=102 or id = 103;
先看看能不能正常讀取 Kafka 中的 maxwell json 數據。
select * from topic_products;
能夠看到,全部字段值都變成了 Update 以後的值,同時,被 Delete 的數據也沒有出現。
接着讓咱們再將聚合數據寫入 Kafka。
insert into topic_sink select name,sum(weight) as sum_weight from topic_products group by name;
在 Flink 集羣的 Web 頁面也可以看到任務正確提交,接下來再讓咱們把聚合數據查出來。
select * from topic_sink
最後,讓咱們查詢一下 MySQL 中的表,來驗證數據是否一致;由於在 Flink 中,咱們將 weight 字段定義成 Decimal(10,2),因此咱們在查詢 MySQL 的時候,須要將 weight 字段進行類型轉換。
沒有問題,咱們的 maxwell json 解析很成功。
根據筆者實現 maxwell-json format 的經驗,Flink 對於接口的定義、對於模塊職責的劃分仍是很清晰的,因此實現一個自定義 CDC format 很是簡單(核心代碼只有200多行)。所以,若是你是用的 OGG,或是自研的同步中間件,能夠經過本文的思路快速實現一個 CDC format,一塊兒解放你的 CDC 數據!
參考連接:
[1]https://github.com/alibaba/canal
[2]https://debezium.io/
[3]https://maxwells-daemon.io/