深刻解讀flink sql cdc的使用以及源碼分析


  • 前言mysql

  • flink消費cdc數據git

    • canal formatgithub

    • debezium formatsql

    • CanalJson反序列化源碼解析數據庫

  • flink cdc connectorapache

    • 背景json

    • mysql-cdcbootstrap

    • mysql-cdc connector源碼解析api

  • changelog format數組

    • 使用場景

    • 示例

    • 源碼淺析

前言

CDC,Change Data Capture,變動數據獲取的簡稱,使用CDC咱們能夠從數據庫中獲取已提交的更改並將這些更改發送到下游,供下游使用。這些變動能夠包括INSERT,DELETE,UPDATE等.

用戶能夠在以下的場景使用cdc:

  • 實時數據同步:好比咱們將mysql庫中的數據同步到咱們的數倉中。

  • 數據庫的實時物化視圖。

flink消費cdc數據

在之前的數據同步中,好比咱們想實時獲取數據庫的數據,通常採用的架構就是採用第三方工具,好比canal、debezium等,實時採集數據庫的變動日誌,而後將數據發送到kafka等消息隊列。而後再經過其餘的組件,好比flink、spark等等來消費kafka的數據,計算以後發送到下游系統。總體的架構以下所示:

對於上面的這種架構,flink承擔的角色是計算層,目前flink提供的format有兩種格式:canal-json和debezium-json,下面咱們簡單的介紹下。

canal format

在國內,用的比較多的是阿里巴巴開源的canal,咱們可使用canal訂閱mysql的binlog日誌,canal會將mysql庫的變動數據組織成它固定的JSON或protobuf 格式發到kafka,以供下游使用。

canal解析後的json數據格式以下:

{
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}

簡單講下幾個核心的字段:

  • type : 描述操做的類型,包括‘UPDATE’, 'INSERT', 'DELETE'。

  • data : 表明操做的數據。若是爲'INSERT',則表示行的內容;若是爲'UPDATE',則表示行的更新後的狀態;若是爲'DELETE',則表示刪除前的狀態。

  • old :可選字段,若是存在,則表示更新以前的內容,若是不是update操做,則爲 null。

完整的語義以下;

private String                    destination;                            // 對應canal的實例或者MQtopic
private String groupId; // 對應mqgroup id
private String database; // 數據庫或schema
private String table; // 表名
private List<String> pkNames;
private Boolean isDdl;
private String type; // 類型: INSERT UPDATE DELETE
// binlog executeTime
private Long es; // 執行耗時
// dml build timeStamp
private Long ts; // 同步時間
private String sql; // 執行的sql, dml sql爲空
private List<Map<String, Object>> data; // 數據列表
private List<Map<String, Object>> old; // 舊數據列表, 用於update, sizedatasize一一對應

在flink sql中,消費這個數據的sql以下:



CREATE TABLE topic_products (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- using canal-json as the format
)

其中DDL中的表的字段和類型要和mysql中的字段及類型能匹配的上,接下來咱們就能夠寫flink sql來查詢咱們定義的topic_products了。

debezium format

在國外,比較有名的相似canal的開源工具備debezium,它的功能較canal更增強大一些,不只僅支持mysql。還支持其餘的數據庫的同步,好比 PostgreSQL、Oracle等,目前debezium支持的序列化格式爲 JSON 和 Apache Avro 。

debezium提供的格式以下:

{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}

一樣,使用flink sql來消費的時候,sql和上面使用canal相似,只須要把foramt改爲debezium-json便可。

CanalJson反序列化源碼解析

接下來咱們看下flink的源碼中canal-json格式的實現。canal 格式做爲一種flink的格式,並且是source,因此也就是涉及到讀取數據的時候進行反序列化,咱們接下來就簡單看看CanalJson的反序列化的實現。具體的實現類是CanalJsonDeserializationSchema。

咱們看下這個最核心的反序列化方法:

@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
try {
//使用json反序列化器將message反序列化成RowData
RowData row = jsonDeserializer.deserialize(message);

//獲取type字段,用於下面的判斷
String type = row.getString(2).toString();
if (OP_INSERT.equals(type)) {
// 若是操做類型是insert,則data數組表示的是要插入的數據,則循環遍歷data,而後添加一個標識INSERT,構造RowData對象,發送下游。
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
}
} else if (OP_UPDATE.equals(type)) {
// 若是是update操做,從data字段裏獲取更新後的數據、
ArrayData data = row.getArray(0);
// old字段獲取更新以前的數據
ArrayData old = row.getArray(1);
for (int i = 0; i < data.size(); i++) {
// the underlying JSON deserialization schema always produce GenericRowData.
GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
for (int f = 0; f < fieldCount; f++) {
if (before.isNullAt(f)) {
//若是old字段非空,則說明進行了數據的更新,若是old字段是null,則說明更新先後數據同樣,這個時候把before的數據也設置成after的,也就是發送給下游的beforeafter數據同樣。
before.setField(f, after.getField(f));
}
}
before.setRowKind(RowKind.UPDATE_BEFORE);
after.setRowKind(RowKind.UPDATE_AFTER);
//把更新先後的數據都發送下游
out.collect(before);
out.collect(after);
}
} else if (OP_DELETE.equals(type)) {
// 若是是刪除操做,data字段裏包含將要被刪除的數據,把這些數據組織起來發送給下游
ArrayData data = row.getArray(0);
for (int i = 0; i < data.size(); i++) {
RowData insert = data.getRow(i, fieldCount);
insert.setRowKind(RowKind.DELETE);
out.collect(insert);
}
} else {
if (!ignoreParseErrors) {
throw new IOException(format(
"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message)));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Canal JSON message '%s'.", new String(message)), t);
}
}
}

flink cdc connector

背景

對於上面的架構,咱們須要部署canal(debezium)+ kafka,而後flink再從kafka消費數據,這種架構下咱們須要部署多個組件,而且數據也須要落地到kafka,有沒有更好的方案來精簡下這個流程呢?咱們接下來說講flink提供的cdc connector。

這個connector並無包含在flink的代碼裏,具體的地址是在https://github.com/ververica/flink-cdc-connectors裏,詳情你們能夠看下這裏面的內容。

這種架構下,flink直接消費數據庫的增量日誌,替代了原來做爲數據採集層的canal(debezium),而後直接進行計算,通過計算以後,將計算結果 發送到下游。總體架構以下:

使用這種架構是好處有:

  • 減小canal和kafka的維護成本,鏈路更短,延遲更低

  • flink提供了exactly once語義

  • 能夠從指定position讀取

  • 去掉了kafka,減小了消息的存儲成本

mysql-cdc

目前flink支持兩種內置的connector,PostgreSQL和mysql,接下來咱們以mysql爲例簡單講講。

在使用以前,咱們須要引入相應的pom,mysql的pom以下:

<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>

若是是sql客戶端使用,須要下載 flink-sql-connector-mysql-cdc-1.1.0.jar 而且放到<FLINK_HOME>/lib/下面

鏈接mysql數據庫的示例sql以下:

CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
)

若是訂閱的是postgres數據庫,咱們須要把connector替換成postgres-cdc,DDL中表的schema和數據庫一一對應。

更加詳細的配置參見:

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector

mysql-cdc connector源碼解析

接下來咱們以mysql-cdc爲例,看看源碼層級是怎麼實現的。既然做爲一個sql的connector,那麼就首先會有一個對應的TableFactory,而後在工廠類裏面構造相應的source,最後將消費下來的數據轉成flink認識的RowData格式,發送到下游。

咱們按照這個思路來看看flink cdc源碼的實現。

在flink-connector-mysql-cdc module中,找到其對應的工廠類:MySQLTableSourceFactory,進入createDynamicTableSource(Context context)方法,在這個方法裏,使用從ddl中的屬性裏獲取的host、dbname等信息構造了一個MySQLTableSource類。

MySQLTableSource

在MySQLTableSource#getScanRuntimeProvider方法裏,咱們看到,首先構造了一個用於序列化的對象RowDataDebeziumDeserializeSchema,這個對象主要是用於將Debezium獲取的SourceRecord格式的數據轉化爲flink認識的RowData對象。 咱們看下RowDataDebeziumDeserializeSchem#deserialize方法,這裏的操做主要就是先判斷下進來的數據類型(insert 、update、delete),而後針對不一樣的類型(short、int等)分別進行轉換,

最後咱們看到用於flink用於獲取數據庫變動日誌的Source函數是DebeziumSourceFunction,且最終返回的類型是RowData。

也就是說flink底層是採用了Debezium工具從mysql、postgres等數據庫中獲取的變動數據。

@SuppressWarnings("unchecked")
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(
rowType,
typeInfo,
((rowData, rowKind) -> {}),
serverTimeZone);
MySQLSource.Builder<RowData> builder = MySQLSource.<RowData>builder()
.hostname(hostname)
..........
DebeziumSourceFunction<RowData> sourceFunction = builder.build();

return SourceFunctionProvider.of(sourceFunction, false);
}

DebeziumSourceFunction

咱們接下來看看DebeziumSourceFunction類

@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
CheckpointedFunction,
ResultTypeQueryable<T> {
.............
}

咱們看到DebeziumSourceFunction類繼承了RichSourceFunction,而且實現了CheckpointedFunction接口,也就是說這個類是flink的一個SourceFunction,會從源端(run方法)獲取數據,發送給下游。此外這個類還實現了CheckpointedFunction接口,也就是會經過checkpoint的機制來保證exactly once語義。

接下來咱們進入run方法,看看是如何獲取數據庫的變動數據的。



@Override
public void run(SourceContext<T> sourceContext) throws Exception {
...........................
// DO NOT include schema change, e.g. DDL
properties.setProperty("include.schema.changes", "false");
...........................
//將全部的屬性信息打印出來,以便排查。
// dump the properties
String propsString = properties.entrySet().stream()
.map(t -> "\t" + t.getKey().toString() + " = " + t.getValue().toString() + "\n")
.collect(Collectors.joining());
LOG.info("Debezium Properties:\n{}", propsString);

//用於具體的處理數據的邏輯
this.debeziumConsumer = new DebeziumChangeConsumer<>(
sourceContext,
deserializer,
restoredOffsetState == null, // DB snapshot phase if restore state is null
this::reportError);

// create the engine with this configuration ...
this.engine = DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(debeziumConsumer) // 數據發給上面的debeziumConsumer
.using((success, message, error) -> {
if (!success && error != null) {
this.reportError(error);
}
})
.build();

if (!running) {
return;
}

// run the engine asynchronously
executor.execute(engine);

//循環判斷,當程序被打斷,或者有錯誤的時候,打斷engine,而且拋出異常
// on a clean exit, wait for the runner thread
try {
while (running) {
if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
break;
}
if (error != null) {
running = false;
shutdownEngine();
// rethrow the error from Debezium consumer
ExceptionUtils.rethrow(error);
}
}
}
catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
}

在函數的開始,設置了不少的properties,好比include.schema.changes 設置爲false,也就是不包含表的DDL操做,表結構的變動是不捕獲的。咱們這裏只關注數據的增刪改。

接下來構造了一個DebeziumChangeConsumer對象,這個類實現了DebeziumEngine.ChangeConsumer接口,主要就是將獲取到的一批數據進行一條條的加工處理。

接下來定一個DebeziumEngine對象,這個對象是真正用來幹活的,它的底層使用了kafka的connect-api來進行獲取數據,獲得的是一個org.apache.kafka.connect.source.SourceRecord對象。經過notifying方法將獲得的數據交給上面定義的DebeziumChangeConsumer來來覆蓋缺省實現以進行復雜的操做。

接下來經過一個線程池ExecutorService來異步的啓動這個engine。

最後,作了一個循環判斷,當程序被打斷,或者有錯誤的時候,打斷engine,而且拋出異常。

總結一下,就是在Flink的source函數裏,使用Debezium 引擎獲取對應的數據庫變動數據(SourceRecord),通過一系列的反序列化操做,最終轉成了flink中的RowData對象,發送給下游。

changelog format

使用場景

當咱們從mysql-cdc獲取數據庫的變動數據,或者寫了一個group by的查詢的時候,這種結果數據都是不斷變化的,咱們如何將這些變化的數據發到只支持append mode的kafka隊列呢?

因而flink提供了一種changelog format,其實咱們很是簡單的理解爲,flink對進來的RowData數據進行了一層包裝,而後加了一個數據的操做類型,包括如下幾種 INSERT,DELETE, UPDATE_BEFORE,UPDATE_AFTER。這樣當下遊獲取到這個數據的時候,就能夠根據數據的類型來判斷下如何對數據進行操做了。

好比咱們的原始數據格式是這樣的。

{"day":"2020-06-18","gmv":100}

通過changelog格式的加工以後,成爲了下面的格式:

{"data":{"day":"2020-06-18","gmv":100},"op":"+I"}

也就是說changelog format對原生的格式進行了包裝,添加了一個op字段,表示數據的操做類型,目前有如下幾種:

  • +I:插入操做。

  • -U :更新以前的數據內容:

  • +U :更新以後的數據內容。

  • -D :刪除操做。

示例

使用的時候須要引入相應的pom

<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<version>1.1.0</version>
</dependency>

使用flink sql操做的方式以下:

CREATE TABLE kafka_gmv (
day_str STRING,
gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'changelog-json'
);

咱們定義了一個 format 爲 changelog-json 的kafka connector,以後咱們就能夠對其進行寫入和查詢了。

完整的代碼和配置請參考:
https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

源碼淺析

做爲一種flink的format ,咱們主要看下其序列化和發序列化方法,changelog-json 使用了flink-json包進行json的處理。

反序列化

反序列化用的是ChangelogJsonDeserializationSchema類,在其構造方法裏,咱們看到主要是構造了一個json的序列化器jsonDeserializer用於對數據進行處理。

public ChangelogJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean ignoreParseErrors,
TimestampFormat timestampFormatOption)
{
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer = new JsonRowDataDeserializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
// the result type is never used, so it's fine to pass in Debezium's result type
resultTypeInfo,
false, // ignoreParseErrors already contains the functionality of failOnMissingField
ignoreParseErrors,
timestampFormatOption);
}

其中createJsonRowType方法指定了changelog的format是一種Row類型的格式,咱們看下代碼:

private static RowType createJsonRowType(DataType databaseSchema) {
DataType payload = DataTypes.ROW(
DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
}

在這裏,指定了這個row格式有兩個字段,一個是data,表示數據的內容,一個是op,表示操做的類型。

最後看下最核心的ChangelogJsonDeserializationSchema#deserialize(byte[] bytes, Collector<RowData> out>)

@Override
public void deserialize(byte[] bytes, Collector<RowData> out) throws IOException {
try {
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(bytes);
GenericRowData data = (GenericRowData) row.getField(0);
String op = row.getString(1).toString();
RowKind rowKind = parseRowKind(op);
data.setRowKind(rowKind);
out.collect(data);
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Debezium JSON message '%s'.", new String(bytes)), t);
}
}
}

使用jsonDeserializer對數據進行處理,而後對第二個字段op進行判斷,添加對應的RowKind。

序列化

序列化的方法咱們看下方法:ChangelogJsonSerializationSchema#serialize

@Override
public byte[] serialize(RowData rowData) {
reuse.setField(0, rowData);
reuse.setField(1, stringifyRowKind(rowData.getRowKind()));
return jsonSerializer.serialize(reuse);
}

這塊沒有什麼難度,就是將flink的RowData使用jsonSerializer序列化成字節數組。

參考:
[1].https://www.bilibili.com/video/BV1zt4y1D7kt
[2].https://github.com/ververica/flink-cdc-connectors

因爲筆者水平有限,也不免有錯誤,請你們不吝賜教,更多信息,也請關注個人公衆號【大數據技術與應用實戰】,謝謝。


本文分享自微信公衆號 - 大數據技術與應用實戰(bigdata_bigdata)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索