數棧技術分享:開源·數棧-擴展FlinkSQL實現流與維表的join

1、擴展FlinkSQL實現流與維表的joinmysql

2、爲何要擴展FlinkSQL?git

一、實時計算須要徹底SQL化github

SQL是數據處理中使用最普遍的語言。它容許用戶簡明扼要地聲明他們的業務邏輯。大數據批計算使用SQL很常見,可是支持SQL的實時計算並很少。其實,用SQL開發實時任務能夠極大下降數據開發的門檻,在袋鼠雲數棧-實時計算模塊,咱們決定實現徹底SQL化。redis

數據計算採用SQL的優點sql

☑ 聲明式。用戶只須要表達我想要什麼,至於怎麼計算那是系統的事情,用戶不用關心。數據庫

☑ 自動調優。查詢優化器能夠爲用戶的 SQL 生成最有的執行計劃。用戶不須要了解它,就能自動享受優化器帶來的性能提高。api

☑ 易於理解。不少不一樣行業不一樣領域的人都懂 SQL,SQL 的學習門檻很低,用 SQL 做爲跨團隊的開發語言能夠很大地提升效率。緩存

☑ 穩定。SQL 是一個擁有幾十年歷史的語言,是一個很是穩定的語言,不多有變更。因此當咱們升級引擎的版本時,甚至替換成另外一個引擎,均可以作到兼容地、平滑地升級。網絡

參考連接:https://blog.csdn.net/weixin_33827965/article/details/86723623oracle

二、實時計算還須要流與維表的JOIN

在實時計算的世界裏不僅是流與流的JOIN,還須要流與維表的JOIN。在去年,袋鼠雲數棧V3.0版本研發期間,當時最新版本——flink1.6中FlinkSQL,已經將SQL的優點應用到Flink引擎中,但還未支持流與維表的JOIN。

FlinkSQL於2017年7月開始面向阿里巴巴集團開放流計算服務的,雖然是一個很是年輕的產品,可是到雙11期間已經支撐了數千個做業,在雙11期間,Blink 做業的處理峯值達到了5+億每秒,而其中僅 Flink SQL 做業的處理總峯值就達到了3億/秒。

參考連接:https://yq.aliyun.com/articles/457438

裏先解釋下什麼是維表;維表是動態表,表裏所存儲的數據有可能不變,也有可能定時更新,可是更新頻率不是很頻繁。在業務開發中通常的維表數據存儲在關係型數據庫如mysql,oracle等,也可能存儲在hbase,redis等nosql數據庫。

3、FlinkSQL實現流與維表的join分步走

一、用Flink api實現維表的功能

要實現維表功能就要用到 Flink Aysnc I/O 這個功能,是由阿里巴巴貢獻給Apache Flink的。

Async I/O 是由阿里巴巴貢獻給社區的,於1.2版本引入,主要目的是爲了解決與外部系統交互時網絡延遲成爲了系統瓶頸的問題。

具體介紹能夠看這篇文章:http://wuchong.me/blog/2017/05/17/flink-internals-async-io/

對應到Flink 的api就是RichAsyncFunction 這個抽象類,繼層這個抽象類實現裏面的open(初始化),asyncInvoke(數據異步調用),close(中止的一些操做)方法,最主要的是實現asyncInvoke 裏面的方法。

流與維表的join會碰到兩個問題:

1)第一個是性能問題。

由於流速要是很快,每一條數據都須要到維表作下join,可是維表的數據是存在第三方存儲系統,若是實時訪問第三方存儲系統,不只join的性能會差,每次都要走網絡io;還會給第三方存儲系統帶來很大的壓力,有可能會把第三方存儲系統搞掛掉。

因此解決的方法就是維表裏的數據要緩存,能夠全量緩存,這個主要是維表數據不大的狀況,還有一個是LRU緩存,維表數據量比較大的狀況。

2)第二個問題是流延遲過來的數據這麼跟以前的維表數據作關聯。

這個就涉及到維表數據須要存儲快照數據,因此這樣的場景用HBase 作維表是比較適合的,由於HBase 是天生支持數據多版本的。

二、解析流與維表join的SQL語法轉化成底層的FlinkAPI

由於FlinkSQL已經作了大部分SQL場景,咱們不可能在去解析SQL的全部語法,在把他轉化成底層FlinkAPI。

因此咱們作的就是解析SQL語法,來找到join表裏有沒有維表,若是有維表,那咱們會把這個join的維表的語句單獨拆來,用Flink的TableAPI和StreamAPi 生成新DataStream,在把這個DataStream與其餘的表在作join這樣就能用SQL來實現流與維表的join語法了。

SQL解析的工具就是用Apache calcite,Flink也是用這個框架作SQL解析的。因此全部語法都是能夠解析的。

1)DEMO SQL

insert 
into 
      MyResult 
      select 
            d.channel, 
            d.info 
      from 
             (       select a.*,b.info 
              from 
                       MyTable a 
              join sideTable b 
                       on a.channel=b.name     
              where a.channel = 'xc2’ 
                          and a.pv=10     ) as d

2)Calcite解析Insert into語句,拆分出子語句

select a.*,b.info from MyTable a join sideTable b on a.channel=b.name 
      where a.channel = 'xc2' and a.pv=10

 

select d.channel, d.info from d

 

insert into MyResult

3) Calcite繼續解析select語句

old: select a.*,b.info from MyTable a join sideTable b on a.channel=b.name 
 where a.channel = 'xc2' and a.pv=10

數棧是雲原生—站式數據中臺PaaS,咱們在github和gitee上有一個有趣的開源項目:FlinkXFlinkX是一個基於Flink的批流統一的數據同步工具,既能夠採集靜態的數據,也能夠採集實時變化的數據,是全域、異構、批流一體的數據同步引擎。你們喜歡的話請給咱們點個star!star!star!

github開源項目:https://github.com/DTStack/flinkx

gitee開源項目:https://gitee.com/dtstack_dev_0/flinkx

相關文章
相關標籤/搜索