flink教程-詳解flink 1.11 中的CDC (Change Data Capture)

這篇文章是開始的時候寫了篇隨筆,更深刻的cdc的使用和源碼分析請參考:深刻解讀flink sql cdc的使用以及源碼分析html

  • CDC簡介
  • Canal
  • CanalJson反序列化源碼解析

CDC簡介

CDC,Change Data Capture,變動數據獲取的簡稱,使用CDC咱們能夠從數據庫中獲取已提交的更改並將這些更改發送到下游,供下游使用。這些變動能夠包括INSERT,DELETE,UPDATE等,mysql

用戶能夠在如下的場景下使用CDC:sql

  • 使用flink sql進行數據同步,能夠將數據從一個數據同步到其餘的地方,好比mysql、elasticsearch等。
  • 能夠在源數據庫上實時的物化一個聚合視圖
  • 由於只是增量同步,因此能夠實時的低延遲的同步數據
  • 使用EventTime join 一個temporal表以即可以獲取準確的結果

flink 1.11 將這些changelog提取並轉化爲table apa和sql,目前支持兩種格式:Debezium和Canal,這就意味着源表不單單是append操做,並且還有upsert、delete操做。數據庫

image

Canal

接下來咱們使用canal爲例簡單介紹下CDC的使用apache

canal 格式:json

{
  "data": [
    {
      "id": "13",
      "username": "13",
      "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
      "name": "Canal Manager V2"
    }
  ],
  "old": [
    {
      "id": "13",
      "username": "13",
      "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
      "name": "Canal Manager"
    }
  ],
  "database": "canal_manager",
  "es": 1568972368000,
  "id": 11,
  "isDdl": false,
  "mysqlType": {...},
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {...},
  "table": "canal_user",
  "ts": 1568972369005,
  "type": "UPDATE"
}

簡單講下幾個核心的字段:bootstrap

  • type : 描述操做的類型,包括‘UPDATE’, 'INSERT', 'DELETE'。
  • data : 表明操做的數據。若是爲'INSERT',則表示行的內容;若是爲'UPDATE',則表示行的更新後的狀態;若是爲'DELETE',則表示刪除前的狀態。
  • old :可選字段,若是存在,則表示更新以前的內容,若是不是update操做,則爲 null。

完整的語義以下;api

private String                    destination;                            // 對應canal的實例或者MQ的topic
    private String                    groupId;                                // 對應mq的group id
    private String                    database;                               // 數據庫或schema
    private String                    table;                                  // 表名
    private List<String>              pkNames;
    private Boolean                   isDdl;
    private String                    type;                                   // 類型: INSERT UPDATE DELETE
    // binlog executeTime
    private Long                      es;                                     // 執行耗時
    // dml build timeStamp
    private Long                      ts;                                     // 同步時間
    private String                    sql;                                    // 執行的sql, dml sql爲空
    private List<Map<String, Object>> data;                                   // 數據列表
    private List<Map<String, Object>> old;                                    // 舊數據列表, 用於update, size和data的size一一對應
-- 定義的字段和data 裏面的數據想匹配 
CREATE TABLE my_table (
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'canal-json.ignore-parse-errors'='true' -- 忽略解析錯誤,缺省值false
);

CanalJson反序列化源碼解析

canal 格式也是做爲一種flink的格式,並且是source,因此也就是涉及到讀取數據的時候進行反序列化,咱們接下來就簡單看看CanalJson的反序列化的實現。具體的實現類是CanalJsonDeserializationSchema。數組

咱們看下這個最核心的反序列化方法:app

@Override
	public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
		try {
		    //使用json反序列化器將message反序列化成RowData
			RowData row = jsonDeserializer.deserialize(message);
			
			//獲取type字段,用於下面的判斷
			String type = row.getString(2).toString();
			if (OP_INSERT.equals(type)) {
				// 若是操做類型是insert,則data數組表示的是要插入的數據,則循環遍歷data,而後添加一個標識INSERT,構造RowData對象,發送下游。
				ArrayData data = row.getArray(0);
				for (int i = 0; i < data.size(); i++) {
					RowData insert = data.getRow(i, fieldCount);
					insert.setRowKind(RowKind.INSERT);
					out.collect(insert);
				}
			} else if (OP_UPDATE.equals(type)) {
				// 若是是update操做,從data字段裏獲取更新後的數據、
				ArrayData data = row.getArray(0);
				// old字段獲取更新以前的數據
				ArrayData old = row.getArray(1);
				for (int i = 0; i < data.size(); i++) {
					// the underlying JSON deserialization schema always produce GenericRowData.
					GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
					GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
					for (int f = 0; f < fieldCount; f++) {
						if (before.isNullAt(f)) {
							//若是old字段非空,則說明進行了數據的更新,若是old字段是null,則說明更新先後數據同樣,這個時候把before的數據也設置成after的,也就是發送給下游的before和after數據同樣。
							before.setField(f, after.getField(f));
						}
					}
					before.setRowKind(RowKind.UPDATE_BEFORE);
					after.setRowKind(RowKind.UPDATE_AFTER);
					//把更新先後的數據都發送下游
					out.collect(before);
					out.collect(after);
				}
			} else if (OP_DELETE.equals(type)) {
				// 若是是刪除操做,data字段裏包含將要被刪除的數據,把這些數據組織起來發送給下游
				ArrayData data = row.getArray(0);
				for (int i = 0; i < data.size(); i++) {
					RowData insert = data.getRow(i, fieldCount);
					insert.setRowKind(RowKind.DELETE);
					out.collect(insert);
				}
			} else {
				if (!ignoreParseErrors) {
					throw new IOException(format(
						"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message)));
				}
			}
		} catch (Throwable t) {
			// a big try catch to protect the processing.
			if (!ignoreParseErrors) {
				throw new IOException(format(
					"Corrupt Canal JSON message '%s'.", new String(message)), t);
			}
		}
	}

參考資料:
[1].https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289
[2].https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc

更多內容,歡迎關注個人公衆號【大數據技術與應用實戰】
image

相關文章
相關標籤/搜索