數棧技術分享:用短平快的方式告訴你Flink-SQL的擴展實現

數棧是雲原生—站式數據中臺PaaS,咱們在github和gitee上有一個有趣的開源項目:FlinkX,FlinkX是一個基於Flink的批流統一的數據同步工具,既能夠採集靜態的數據,也能夠採集實時變化的數據,是全域、異構、批流一體的數據同步引擎。你們喜歡的話請給咱們點個star!star!star!mysql

github開源項目:https://github.com/DTStack/flinkxgit

gitee開源項目:https://gitee.com/dtstack_dev_0/flinkxgithub

 

首先,本文所述均基於flink 1.5.4正則表達式

1、咱們爲何擴展Flink-SQL?

因爲Flink 自己SQL語法並不提供在對接輸入源和輸出目的的SQL語法。數據開發在使用的過程當中須要根據其提供的Api接口編寫Source和 Sink, 異常繁瑣,不只須要瞭解FLink 各種Operator的API,還須要對各個組件的相關調用方式有了解(好比kafka,redis,mongo,hbase等),而且在須要關聯到外部數據源的時候沒有提供SQL相關的實現方式,所以數據開發直接使用Flink編寫SQL做爲實時的數據分析時須要較大的額外工做量。redis

咱們的目的是在使用Flink-SQL的時候只須要關心作什麼,而不須要關心怎麼作。不須要過多的關心程序的實現,專一於業務邏輯。sql

接下來,咱們一塊兒來看下Flink-SQL的擴展實現吧!緩存

2、擴展了哪些flink相關sql

一、建立源表語句網絡

二、建立輸出表語句異步

三、建立自定義函數函數

四、維表關聯

3、各個模塊是如何翻譯到flink的實現

一、如何將建立源表的sql語句轉換爲flink的operator

Flink中表的都會映射到Table這個類。而後調用註冊方法將Table註冊到environment。

StreamTableEnvironment.registerTable(tableName, table);

當前咱們只支持kafka數據源。Flink自己有讀取kafka 的實現類, FlinkKafkaConsumer09,因此只須要根據指定參數實例化出該對象。並調用註冊方法註冊便可。

另外須要注意在flink sql常常會須要用到rowtime, proctime, 因此咱們在註冊表結構的時候額外添加rowtime,proctime。

當須要用到rowtime的使用須要額外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定義watermark主要作兩個事情:1:如何從Row中獲取時間字段。 2:設定最大延遲時間。

二、 如何將建立的輸出表sql語句轉換爲flink的operator

Flink輸出Operator的基類是OutputFormat, 咱們這裏繼承的是RichOutputFormat, 該抽象類繼承OutputFormat,額外實現了獲取運行環境的方法getRuntimeContext(), 方便於咱們以後自定義metric等操做。

咱們以輸出到mysql插件mysql-sink爲例,分兩部分:

  • 將create table 解析出表名稱,字段信息,mysql鏈接信息。

該部分使用正則表達式的方式將create table 語句轉換爲內部的一個實現類。該類存儲了表名稱,字段信息,插件類型,插件鏈接信息。

  • 繼承RichOutputFormat將數據寫到對應的外部數據源。

主要是實現writeRecord方法,在mysql插件中其實就是調用jdbc 實現插入或者更新方法。

三、如何將自定義函數語句轉換爲flink的operator;

Flink對udf提供兩種類型的實現方式:

1)繼承ScalarFunction

2)繼承TableFunction

須要作的將用戶提供的jar添加到URLClassLoader, 並加載指定的class (實現上述接口的類路徑),而後調用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的註冊。以後便可使用改定義的udf;

四、維表功能是如何實現的?

流計算中一個常見的需求就是爲數據流補齊字段。由於數據採集端採集到的數據每每比較有限,在作數據分析以前,就要先將所需的維度信息補全,可是當前flink並未提供join外部數據源的SQL功能。

實現該功能須要注意的幾個問題:

1)維表的數據是不斷變化的

在實現的時候須要支持定時更新內存中的緩存的外部數據源,好比使用LRU等策略。

2)IO吞吐問題

若是每接收到一條數據就串行到外部數據源去獲取對應的關聯記錄的話,網絡延遲將會是系統最大的瓶頸。這裏咱們選擇阿里貢獻給flink社區的算子RichAsyncFunction。該算子使用異步的方式從外部數據源獲取數據,大大減小了花費在網絡請求上的時間。

3)如何將sql 中包含的維表解析到flink operator

爲了從sql中解析出指定的維表和過濾條件, 使用正則明顯不是一個合適的辦法。須要匹配各類可能性。將是一個無窮無盡的過程。查看flink自己對sql的解析。它使用了calcite作爲sql解析的工做。將sql解析出一個語法樹,經過迭代的方式,搜索到對應的維表;而後將維表和非維表結構分開。

經過上述步驟能夠經過SQL完成經常使用的從kafka源表,join外部數據源,寫入到指定的外部目的結構中。

相關文章
相關標籤/搜索