詳解flink中Look up維表的使用

  • 背景java

  • LookupableTableSourcemysql

  • 實例講解git

  • 源碼解析github

    • JdbcTableSourceredis

    • JdbcLookupFunctionsql


背景

在流式計算中,維表是一個很常見的概念,通常用於sql的join中,對流式數據進行數據補全,好比咱們的source stream是來自日誌的訂單數據,可是日誌中咱們只是記錄了訂單商品的id,並無其餘的信息,可是咱們把數據存入數倉進行數據分析的時候,卻須要商品名稱、價格等等其餘的信息,這種問題咱們能夠在進行流處理的時候經過查詢維表的方式對數據進行數據補全。數據庫

維表通常存儲在外部存儲中,好比mysql、hbase、redis等等,今天咱們以mysql爲例,講講flink中維表的使用。緩存

LookupableTableSource

在flink中提供了一個LookupableTableSource,能夠用於實現維表,也就是咱們能夠經過某幾個key列去查詢外部存儲來獲取相關的信息用於補全stream的數據。微信

public interface LookupableTableSource<T> extends TableSource<T> {

TableFunction<T> getLookupFunction(String[] lookupKeys);

AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

boolean isAsyncEnabled();
}

咱們看到,LookupableTableSource有三個方法app

  • getLookupFunction:用於同步查詢維表的數據,返回一個TableFunction,因此本質上仍是經過用戶自定義 UDTF來實現的。

  • getAsyncLookupFunction:用於異步查詢維表的數據,該方法返回一個對象

  • isAsyncEnabled:默認狀況下是同步查詢,若是要開啓異步查詢,這個方法須要返回true

在flink裏,咱們看到實現了這個接口的主要有四個類,JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource,今天咱們主要以jdbc爲例講講如何進行維表查詢。

實例講解

接下來咱們講一個小例子,首先定義一下stream source,咱們使用flink 1.11提供的datagen來生成數據。

咱們來模擬生成用戶的數據,這裏只生成的用戶的id,範圍在1-100之間。

CREATE TABLE datagen (
userid int,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='100',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
)

datagen具體的使用方法能夠參考:

聊聊flink 1.11 中的隨機數據生成器-DataGen connector

而後再建立一個mysql維表信息:

CREATE TABLE dim_mysql (
id int,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'userinfo',
'username' = 'root',
'password' = 'root'
)

咱們這個mysql表中樣例數據以下:

最後執行sql查詢,流表關聯維表:

SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  ON datagen.userid = dim_mysql.id

結果示例以下:

3> 53,2020-09-03T07:19:34.565,null,null
3> 73,2020-09-03T07:19:34.566,null,null
1> 14,2020-09-03T07:19:34.566,14,aaddda
2> 11,2020-09-03T07:19:34.566,null,null
4> 8,2020-09-03T07:19:34.566,8,name8
1> 61,2020-09-03T07:19:34.567,null,null
3> 12,2020-09-03T07:19:34.567,12,aaa
2> 99,2020-09-03T07:19:34.567,null,null
4> 37,2020-09-03T07:19:34.568,null,null
2> 13,2020-09-03T07:19:34.569,13,aaddda
3> 6,2020-09-03T07:19:34.568,6,name6

咱們看到對於維表中存在的數據,已經關聯出來了,對於維表中沒有的數據,顯示爲null

完整代碼請參考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/dimension/JdbcDim.java

源碼解析

JdbcTableSource

以jdbc爲例,咱們來看看flink底層是怎麼作的。

JdbcTableSource#isAsyncEnabled方法返回的是false,也就是不支持異步的查詢,因此進入JdbcTableSource#getLookupFunction方法。

@Override
public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return JdbcLookupFunction.builder()
.setOptions(options)
.setLookupOptions(lookupOptions)
.setFieldTypes(rowTypeInfo.getFieldTypes())
.setFieldNames(rowTypeInfo.getFieldNames())
.setKeyNames(lookupKeys)
.build();
}

最終是構造了一個JdbcLookupFunction對象,

  • options是鏈接jdbc的一些參數,好比user、pass、url等。

  • lookupOptions是一些有關維表的參數,主要是緩存的大小、超時時間等。

  • lookupKeys也就是要去關聯查詢維表的字段。

JdbcLookupFunction

因此咱們來看看JdbcLookupFunction類,這個JdbcLookupFunction是一個TableFunction的子類,具體的TableFunction的使用能夠參考這個文章:

Flink實戰教程-自定義函數之TableFunction

一個TableFunction最核心的就是eval方法,在這個方法裏,作的主要的工做就是經過傳進來的多個keys拼接成sql去來查詢數據,首先查詢的是緩存,緩存有數據就直接返回,緩存沒有的話再去查詢數據庫,而後再將查詢的結果返回並放入緩存,下次查詢的時候直接查詢緩存。

爲何要加一個緩存呢?默認狀況下是不開啓緩存的,每來一個查詢,都會給維表發送一個請求去查詢,若是數據量比較大的話,勢必會給存儲維表的系統形成必定的壓力,因此flink提供了一個LRU緩存,查詢維表的時候,先查詢緩存,緩存沒有再去查詢外部系統,可是若是有一個數據查詢頻率比較高,一直被命中,就沒法獲取新數據了。因此緩存還要加一個超時時間,過了這個時間,把這個數據強制刪除,去外部系統查詢新的數據。

具體的怎麼開啓緩存呢?咱們看下JdbcLookupFunction#open方法

@Override
public void open(FunctionContext context) throws Exception {
try {
establishConnectionAndStatement();
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}

也就是說cacheMaxSize和cacheExpireMs須要同時設置,就會構造一個緩存對象cache來緩存數據.這兩個參數對應的DDL的屬性就是lookup.cache.max-rows和lookup.cache.ttl

對於具體的緩存的大小和超時時間的設置,用戶須要根據自身的狀況來本身定義,在數據的準確性和系統的吞吐量之間作一個權衡。

更多幹貨信息,歡迎關注個人公衆號【大數據技術與應用實戰】


本文分享自微信公衆號 - 大數據技術與應用實戰(bigdata_bigdata)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索