本文做者是來自 TiBoys 隊的崔秋同窗,他們的項目 TBSSQL 在 TiDB Hackathon 2018 中得到了一等獎。TiDB Batch and Streaming SQL(簡稱 TBSSQL)擴展了 TiDB 的 SQL 引擎,支持用戶以相似 StreamSQL 的語法將 Kafka、Pulsar 等外部數據源以流式表的方式接入 TiDB。經過簡單的 SQL 語句,用戶能夠實現對流式數據的過濾,流式表與普通表的 Join(好比流式事實表與多個普通維度表),甚至經過 CREATE TABLE AS SELECT 語法將處理過的流式數據寫入普通表中。此外,針對流式數據的時間屬性,咱們實現了基於時間窗口的聚合/排序算子,使得咱們能夠對流式數據進行時間維度的聚合/排序。html
算起來這應該是第三次參加的 Hackathon 了,第一次參加的時候仍是在小西天的豌豆莢,和東旭一塊兒,作跨平臺數據傳輸的工具,兩天一晚上;第二次和奇叔一塊兒在 3W 咖啡,又是兩天一晚上;此次在本身家舉辦 Hackathon 比賽,下定決心必定要佛性一些,本着能抱大腿就不單幹的心態,迅速決定拉唐長老(唐劉)下水。接下來就計劃着折騰點啥,由於咱們兩個前端都不怎麼樣,因此只能硬核一些,因而拍了兩個方案。前端
方案一:以前跟唐長老合做過很長一段時間,咱們兩個對於測試質量之類的事情也都很是關注,因此想着能不能在 Chaos 系統上作一些文章,把一些前沿的測試理論和經驗方法結合到系統裏面來,作一套通用的分佈式系統測試框架,就像 Jepsen 那樣,用這套系統去測試和驗證主流的開源分佈式項目。git
方案二:越接近於業務實時性的數據處理越有價值,無論是 Kafka/KSQL,Flink/Spark Streaming 都是在向着實時流計算領域方向進行將來的探索。TiDB 雖然已經可以支持類 Real Time OLAP 的場景,可是對於更實時的流式數據處理方面尚未合適的解決方案,不過 TiDB 具備很是好的 Scale 能力,自然的能存儲海量的數據庫表數據,因此在 Streaming Event 和 Table 關聯的場景下具備很是明顯的優點。若是在 TiDB 上可以實現一個 Streaming SQL 的引擎,實現 Batch/Streaming 的計算融合,那將會是一件很是有意思的事情。github
由於打 Hackathon 比賽主要是但願折騰一些新的東西,因此咱們兩個簡單討論完了以後仍是傾向於方案二,固然作不作的出來另說。算法
當咱們正準備作前期調研和設計的時候,Hackathon 主辦方把唐長老拉去作現場導師,參賽規則規定導師不能下場比賽,囧,因而就這樣被被動放了鴿子。好在後來遇到了一樣被霸哥(韓飛)當導師而放鴿子的川總(杜川),川總對於 Streaming SQL 很是感興趣,因而難兄難弟一拍即合,迅速決定抱團取暖。隨後,Robot 又介紹了一樣尚未組隊的社區小夥伴 GZY(高志遠),這樣算是湊齊了三我的,可是一想到沒有前端確定搞不定,因而就拜託孃家人(Dashbase)的交際小王子 WPH(王鵬翰)出馬,幫助去召喚一個靠譜的前端小夥伴,後來交際未果直接把本身賣進了隊伍,這樣終於湊齊了四後端,不,應該是三後端 + 一僞前端的組合。sql
由於立刻要準備提交項目和團隊名稱,你們都一致以爲方案二很是有意思,因此就選定了更加儒雅的 TBSSQL(TiDB Batch and Streaming SQL)做爲項目名稱,TSBSQL 遺憾落選。在團隊名稱方面,打醬油老男孩 / Scboy / TiStream / 養生 Hackathon / 佛系 Hackathon 都由於不夠符合氣質被遺憾淘汰,最後表明更有青春氣息的 TiBoys 入選(跟着我左手右手一個慢動做,逃……數據庫
所謂 「三軍未動, 糧草先行」,既然已經報名了,仍是要稍做準備,雖然已經肯定了大的方向,可是具體的落地方案尚未細化,並且人員的分工也不是太明確。又通過一輪簡單的討論以後,明確了你們的職責方向,我這邊主要負責項目總體設計,進度管理以及和 TiDB 核心相關的代碼,川總主要負責 TiDB 核心技術攻關,GZY 負責流數據源數據的採集部分,WPH 負責前端展示以及 Hackathon 當天的 Demo 演示,分工以後你們就開始分頭調研動工。apache
做爲這兩年來基本沒怎麼寫過代碼的退役型選手來講,內心仍是很是沒底的,也不知道如今 TiDB 代碼結構和細節變成什麼樣了,不求有功,但求別太拖後腿。編程
對於項目自己的典型應用場景,你們仍是比較明確的,以爲這個方向是很是有意義的。後端
內部數據系統:
業界 Streaming 相關的系統不少,前期我這邊快速地看了下能不能站在巨人的肩膀上作事情,有沒有可借鑑或者可借用的開源項目。
本質上 Apache Beam 仍是一個批處理和流處理融合的 SDK Model,用戶能夠在應用層使用更簡單通用的函數接口實現業務的處理,若是使用 Beam 的話,還須要實現自定義的 Runner,由於 TiDB 自己主要的架構設計很是偏重於數據庫方向,內部並無特別明確的通用型計算引擎,因此現階段基本上沒有太大的可行性。固然也能夠選擇用 Flink 做爲 Runner 鏈接 TiDB 數據源,可是這就變成了 Flink&TiDB 的事情了,和 Beam 自己關係其實就不大了。
Flink 是一個典型的流處理系統,批處理能夠用流處理來模擬出來。
自己 Flink 也是支持 SQL 的,可是是一種嵌入式 SQL,也就是 SQL 和應用程序代碼寫在一塊兒,這種作法的好處是能夠直接和應用層進行整合,可是很差的地方在於,接口不是太清晰,有業務侵入性。阿里內部有一個加強版的 Flink 項目叫 Blink,在這個領域比較活躍。若是要實現批處理和流處理融合的話,須要內部定製和修改 Flink 的代碼,把 TiDB 做爲數據源對接起來,還有可能須要把一些環境信息提交給 TiDB 以便獲得更好的查詢結果,固然或許像 TiSpark 那樣,直接 Flink 對接 TiKV 的數據源應該也是能夠的。由於自己團隊對於 Scala/Java 代碼不是很熟悉,並且 Flink 的模式會有必定的侵入性,因此就沒有在這方面進行更多的探索。同理,沒有選擇 Spark Streaming 也是相似的緣由。固然有興趣的小夥伴能夠嘗試下這個方向,也是很是有意思的。
由於 Kafka 自己只是一個 MQ,之後會向着流處理方向演進,可是目前並無實現批處理和流處理統一的潛力,因此更多的咱們只是借鑑 Kafka SQL 的語法。目前 Streaming SQL 尚未一個統一的標準 SQL,Kafka SQL 也只是一個 SQL 方言,支持的語法還比較簡單,可是很是實用,並且是偏交互式的,沒有業務侵入性。很是適合在 Hackathon 上作 Demo 演示,咱們在項目實現中也是主要參考了 Kafka SQL 的定義,固然,Flink 和 Calcite 也有本身定義的 Streaming 語法,這裏就再也不討論了。
調研準備工做討論到這裏基本上也就差很少了,因而咱們開始各自備(hua)戰(shui),出差的出差,加班的加班,接客戶的接客戶,學 Golang 的學 Golang,在這種緊(fang)張(fei)無(zi)比(wo)的節奏中,迎來了 Hackathon 比賽的到來。
具體的技術實現方面都是比較硬核的東西,細節也比較多,扔在最後面寫,免的你們看到一半就點×了。至於參加 Hackathon 的感覺,由於不像龍哥那麼文豪,也不像馬老師那麼俏皮,並且原本讀書也很少,因此也只能喊一句「黑客馬拉松真是太好玩了」!
因爲飛機晚點,川總這個點兒才展轉到酒店。睡覺以前很是擔憂一覺睡過頭,讓這趟 Hackathon 之旅還沒開始就結束了,沒想到躺下之後滿腦子都是技術細節,怎麼都睡不着。漫漫長夜,無眠。
川總早早來到 Hackathon 現場。因爲來太早,其餘選手都還沒到,因此他提早刺探刺探敵情的計劃也泡湯了,只好在賽場瞎晃悠一番熟悉熟悉環境,順道跟大獎合了個影。
簡單的開幕式以後,Hackathon 正式開始。咱們首先搞定的是 Streaming SQL 的語法定義以及 Parser 相關改動。這一部分在以前就通過比較詳細的在線討論了,因此現場只須要根據碰頭後統一的想法一頓敲敲敲就搞定了。快速搞定這一塊之後,咱們就有了 SQL 語法層面的 Streaming 實現。固然此時 Streaming 也僅限於語法層面,Streaming 在 SQL 引擎層面對應的其實仍是普通的TiDB Table。
接下來是 DDL 部分。這一塊咱們已經想好了要複用 TiDB Table 的 Meta 結構 TableInfo ,所以主要工做就是按照 DDL源碼解析 依葫蘆畫瓢,難度也不大,以致於咱們還有閒心糾結一下 SHOW TABLES 語法裏到底要不要屏蔽掉 Streaming Table 的問題。
總體上來看上午的熱身活動仍是進行的比較順利的,起碼 Streaming DDL 這塊沒有成爲太大的問題。這裏面有個插曲就是我在 Hackathon 以前下載編譯 TiDB,結果發現 TiDB 的 parser 已經用上時髦的 go module 了(也是很久很久沒看 TiDB 代碼),折騰好半天,不過好處就是 Hackathon 當天的時候改起來 parser 就比較輕車熟路了,因此賽前編譯一個 TiDB 仍是很是有必要的。
隨着熱身的結束,立刻迎來了穩定的敲敲敲階段。川總簡單弄了一個 Mock 的 StreamReader 而後丟給了我,由於我以前寫 TiDB 的時候,時代比較遙遠,那時候都還在用周 sir 的 Datum,如今一看,爲了提升內存效率和性能,已經換成了高大上的 Chunk,因而一個很常見的問題:如何用最正確的作法把一個傳過來的 Json 數據格式化成 Table Row 數據放到 Chunk 裏面,讓完全我懵逼了。
這裏面倒不是技術的問題,主要是類型太多,若是枚舉全部類型,搞起來很麻煩,按道理應該有更輕快的辦法,可是翻了源代碼仍是沒找到解決方案。這個時候果斷去求助現場導師,也順便去賽場溜(ci)達(tan)一(di)圈(qing)。隨便掃了一眼,驚呆了,龍哥他們居然已經開始寫 PPT 了,以前知道龍哥他們強,可是沒想到強到這個地步,還讓不讓你們一塊歡快地玩耍了。同時,也瞭解到了很多很是有意思的項目,好比用機器學習方法去自動調節 TiDB 的調度參數,用 Lua 給 TiKV 添加 UDF 之類的,在 TiDB 上面實現異構數據庫的關聯查詢(簡直就是 F1 的大一統,並且聽小道消息,他們都已經把 Join 推到 PG 上面去了,然而咱們還沒開始進入到核心開發流程),在 TiKV 上面實現時序數據庫和 Memcached 協議等等,甚至東旭都按捺不住本身 Hackathon 起來了(嘻嘻,能夠學學我啊 ;D )。
原本還想去聊聊各個項目的具體實現方案,可是一想到本身挖了一堆坑還沒填,只能默默回去膜拜 TiNiuB 項目。看起來不能太佛繫了,因而乎我趕忙召開了一次內部團隊 sync 的 catch up,明確下分工,川總開始死磕 TBSSQL 的核心邏輯 Streaming Aggregation 的實現,我這邊繼續搞不帶 Aggregation 的 Streaming SQL 的其餘實現,GZY 已經部署起來了 Pulsar,開始準備 Mock 數據,WPH 輔助 GZY 同時也快速理解咱們的 Demo 場景,着手設計實現前端展示。
我這邊和麪帶慈父般欣慰笑容的老師(張建)進行了一些技術方案實現上的交流後,瞭解到目前社區小夥伴已經在搞 CREATE TABLE AS SELECT 的重要信息(後續證實此信息值大概一千塊 RMB)。
此時,在解決了以前的問題以後,TBSSQL 終於能跑通簡單的 SELECT 語句了。咱們內心稍微有點底了,因而一氣呵成,順路也實現了帶 Where 條件的 Stream Table 的 SELECT,以及 Stream Table 和 TiDB Table 的多表 Join,到這裏,此時,按照分工,我這邊的主體工做除了 Streaming Position 的持久化支持之外,已經寫的差很少了,剩下就是去實現一些 Nice to have 的 DDL 的語法支持。川總這裏首先要搞的是基於時間窗口的 Streaming Aggregation。按照咱們的如意算盤,這裏基本上能夠複用 TiDB 現有的 Hash Aggregation 的計算邏輯,只須要加上窗口的處理就完事兒了。
不過實際下手的時候仔細一研究代碼,發現 Aggregation 這一塊代碼在川總疏於研究這一段時間已經被重構了一把,加上了一個併發執行的分支,看起來還挺複雜。因而一不作二不休,川總把 Hash Aggregation 的代碼拷了一份,刪除了併發執行的邏輯,在比較簡單的非併發分支加上窗口相關實現。不過這種方法意味着帶時間窗口的 Aggregation 得單獨出 Plan,Planner 上又得改一大圈。這一塊弄完之後,還沒來得及調試,就到吃晚飯的點兒了。
吃完晚飯,由於下午死磕的比較厲害,我和張建、川總出門去園區溜達了一圈。期間張建問咱們搞得咋樣了,我望了一眼川總,語重心長地說主要成敗已經不在我了(後續證實這句語重心長至少也得值一千塊 RMB),川總果斷信心滿滿地說問題不大,一切盡在掌握之中。
沒想到這個 Flag 剛立起來仍是溫的,就立馬被打臉了。問題出在吃飯前搞的聚合那塊(具體細節能夠看下後面的坑系列),爲了支持時間窗口,咱們必須確保 Streaming 上的窗口列能透傳到聚合算子當中,爲此咱們屏蔽了優化器中窗口聚合上的列裁剪規則。但是實際運行當中,咱們的修改並無生效???而此時,川總昨天一整晚沒睡覺的反作用開始顯現出來了,思路已經有點不太清醒了。因而咱們把張建拖過來一塊兒 debug。而後我這邊也把用 TiDB Global Variable 控制 Streaming Position 的功能實現了,而且和 GZY 這邊也實現了 Mock 數據。
以後,我也順路休息休息,畢竟川總這邊搞不定,咱們這邊搞的再好也沒啥用。除了觀摩川總和張建手把手,不,肩並肩結對小黑屋編程以外,我也順便申請了部署 Kafka 聯調的機器。
咱們這邊最核心的功能還沒突破,亮眼的 CREATE TABLE AS SELECT Streaming 也還沒影,其實中期進度仍是偏慢了(或者說以前我設計實現的功能的工做量太大了,看起來今天晚上只能死磕了,囧)。我調試 Kafka 死活調不通,端口能夠 Telnet 登錄,可是寫入和獲取數據的時候一直報超時錯誤,並且我這邊已經開始困上來了,有點扛不動了,後來在 Kafka 老司機 WPH 一塊兒看了下配置參數,才發現 Advertise URL 設置成了本地地址,換成對外的 IP 就行了,固然爲了簡單方便,咱們設置了單 Partition 的 Topic,這樣 collector 的 Kafka 部分就搞的差很少了,剩下就是實現一個 http 的 restful api 來提供給 TiDB 的 StreamReader 讀取,整個連通工做就差很少了。
這時候川總那邊也傳來了好消息,終於從 Streaming Aggregation 這個大坑裏面爬出來了,後面也比較順利地搞定了時間窗口上的聚合這塊。此時時間已經到了 Hackathon 的次日,很多其餘項目的小夥伴已經收攤回家了。不過咱們抱着能多作一個 Feature 是一個的心態,決定挑燈夜戰。首先,川總把 Sort Executor 改了一把以支持時間窗口,可能剛剛的踩坑經歷爲咱們攢了人品,Sort 上的改動居然一次 AC 了。藉着這股勁兒,咱們又回頭優化了一把 SHOW CREATE STREAM 的輸出。
這裏有個插曲就是爲了近距離再回味和感覺下以前的開發流程,咱們特地在 TiDB 的 repo 裏面開了一個 tiboys/hackathon 的分支,而後提交的時候用了標準的 Pull Request 的方式,點讚了才能 merge(後來想一想打 Hackathon 不是太可取,沒什麼用,還挺耽誤時間,不知道當時怎麼想的),因此在 master 分支和 tiboys/hackathon 分支看的時候都沒有任何提交記錄。嘻嘻,估計龍哥也沒仔細看咱們的 repo,因此其實在龍哥的激勵下,咱們的效率仍是能夠的 :) 。
GZY 和 WPH 把今天安排的工做完成的差很少了,並且次日還靠他們主要準備 Demo Show,就去睡覺了,川總也已經困得不行了,準備打烊睡覺。我和川總合計了一下,還差一個最重要的 Feature,抱着就試一把,不行就手工的心態,咱們把社區的小夥伴王聰(bb7133)提的支持 CREATE TABLE AS SELECT 語法的 PR 合到了咱們的分支,衝突居然不是太多,而後稍微改了一下來支持 Streaming,結果一運行奇蹟般地發現居然可以運行,RP 全面爆發了,因而咱們就近乎免費地增長了一個 Feature。改完這個地方,川總實在堅持不住了,就回去睡了。我這邊的 http restful api 也搞的差很少了,準備聯調一把,StreamReader 經過 http client 從 collector 讀數據,collector 經過 kafka consumer 從 kafka broker 獲取數據,結果獲取的 Json 數據序列化成 TiDB 自定義的 Time 類型總是出問題,因而我又花了一些時間給 Time 增長了 Marshall 和 Unmarshal 的格式化支持,到這裏基本上能夠 work 了,看了看時間,凌晨四點半,我也準備去睡了。期間好幾回看到霸哥(韓飛)凌晨還在一直幫小(tian)夥(zi)伴(ji)查(wa)問(de)題(keng),其實霸哥認真的時候仍是很是靠譜的。
這個時候人陸陸續續地來了,我這邊也進入了打醬油的角色,年紀大了確實剛不動了,吃了早餐以後,開始準備思考接下來的分工。由於你們都是臨時組隊,到了 Hackathon 才碰面,基本上沒有太多磨合,並且廣泛次日狀態都不大好。雖然你們都很努力,可是在我以前設計的宏大項目面前,仍是感受人力不太夠,因此早上 10 點咱們開了第二次 sync 的 catch up,討論接下來的安排。我去負責更新代碼和 GitHub 的 Readme,川總最後再簡單對代碼掃尾,順便和 GZY 去錄屏(羅伯特小姐姐介紹的不翻車經驗),WPH 準備畫圖和 PPT,由於時間有限,前端展示部分打算從賣家秀直接轉到買家秀。11 點敲定代碼徹底封板,而後安心準備 PPT 和下午的 Demo。
由於抽籤抽的比較靠後,主要事情在 WPH 這邊,我和川總基本上也沒什麼大事了,順手搞了幾幅圖,而後跟馬老師還有其餘項目的小夥伴們開始八卦聊天。由於正好週末,家裏妹子買東西順便過來慰問了下。下午主要聽了各個 Team 的介紹,欣賞到了極盡浮誇的 LOGO 動畫,Get 到了有困難找 Big Brother 的新技能,學習和了解了頗有意思的 Idea,真心以爲這屆 Hackathon 作的很是值得回憶。
從最後的現場展現狀況來看,由於 TBSSQL 內容比較多,真的展現下來,感受 6 分鐘時間仍是太趕,好在 WPH Demo 的仍是很是順利的,把咱們作的事情都展現出來了。由於砍掉了一些前端展示的部分(這塊咱們也確實不怎麼擅長),其實對於 Hackathon 項目是很是吃虧的,不過有一點比較欣慰,就像某光頭大佬說的,評委們都是懂技術的。由於實現完整性方面能作的也都搞差很少了,打的雖然很累可是也很開心,對於結果也就不怎麼糾結了。
由於川總晚上的飛機,小夥伴們簡單溝通了幾句,一致贊成去園區找個地吃個晚飯,因而你們拉上霸哥去了「頭一號」,也是第一次吃了大油條,中間小夥伴們各類黑誰誰誰寫的 bug 巴拉巴拉的,後來看手機羣裏有人 @ 我說拿獎了。
其實不少項目各方面綜合實力都不錯,能夠說是各有特點,很難說的上哪一個項目有絕對的優點。咱們以前有討論過,TBSSQL 有獲獎的贏面,畢竟從完整性,實用性和生態方面都是有潛質的,可是能得到你們最高的承認仍是小意外的,特別感謝各位技術大佬們,也特別感謝幫助咱們領獎的滿分羅伯特小姐姐。
最後你們補了一張合照,算是爲此次 Hackathon 畫下一個句號。
至此,基本上 Hackathon 的流水帳就記錄完了,整個項目地址在 https://github.com/qiuyesuifeng/tidb 歡迎你們關注和討論。
TLDR: 文章很長,挑感興趣的部分看看就能夠了。
在前期分析和準備以後,基本上就只有在 TiDB 上作 SQL Streaming 引擎一條路可選了,細化了下要實現的功能以及簡單的系統架構,感受工做量仍是很是大的。
下面簡單介紹下系統架構和各個模塊的功能:
在數據源採集部分(collector),咱們計劃選取幾種典型的數據源做爲適配支持。
最流行的開源 MQ 系統,不少 Streaming 系統對接的都是 Kafka。
流行的開源 MQ 系統,目前比較火爆,有趕超 Kafka 的勢頭。
支持 MySQL/TiDB Binlog 處理,至關因而 MySQL Trigger 功能的升級增強版了。咱們對以前的 MySQL -> TiDB 的數據同步工具 Syncer 也比較熟悉,因此這塊工做量應該也不大。
常見的 Log 日誌,這個就沒什麼好解釋的了。
爲了方便 Demo 和協做,collector 除了適配不一樣的數據源,還會提供一個 restful api 的接口,這樣 TBSSQL 就能夠經過 pull 的方式一直獲取 streaming 的數據。由於 collector 主要是具體的工程實現,因此就不在這裏細節展開了,感興趣的話,能夠參考下 相關代碼。
要在 TiDB 中實現 Streaming 的功能即 TBSSQL,就須要在 TiDB 內部深刻定製和修改 TiDB 的核心代碼。
Streaming 有兩個比較本質的特徵:
所以,要在 TiDB SQL 引擎上支持 Streaming SQL,所涉及到的算子都須要根據 Streaming 的這兩個特色作修改。以聚合函數(Aggregation)爲例,按照 SQL 語義,聚合算子的實現應該分紅兩步:首先是 Grouping, 即對輸入按照聚合列進行分組;而後是 Execute, 即在各個分組上應用聚合函數進行計算,以下圖所示。
對於 Streaming,由於其輸入能夠是無盡的,Grouping 這個階段永遠不可能結束,因此按照老套路,聚合計算就無法作了。這時,就要根據 Streaming 的時序特性對 Streaming 數據進行分組。每個分組被稱爲一個 Time Window(時間窗口)。就拿最簡單的 Tumbling Window 來講,能夠按照固定的時間間隔把 Streaming 輸入切分紅一個個相互無交集的窗口,而後在每個窗口上就能夠按照以前的方式進行聚合了。
聚合算子只是一個比較簡單的例子,由於其只涉及一路輸入。若是要修改多路輸入的算子(好比說 Join 多個 Streaming),改動更復雜。此外,時間窗口的類型也是多種多樣,剛剛例子中的 Tumbling Window 只是基礎款,還有複雜一點的 Hopping Window 以及更復雜的 Sliding Window。在 Hackathon 的有限時間內,咱們既要考慮實現難度,又要突出 Batch / Streaming 融合處理的特色,所以在技術上咱們作出以下抉擇:
CREATE TABLE AS SELECT xxx FROM streaming
的相似語法。此外,既然是要支持 Streaming SQL,選擇合適的 SQL 語法也是必要的,須要在 Parser 和 DDL 部分作相應的修改。單整理下,咱們的 Feature List 以下圖所示:
下面具體聊聊咱們實現方案中的一些關鍵選擇。
Streaming SQL 語法的核心是時間窗口的定義,Time Window 和通常 SQL 中的 Window Function 其實語義上是有區別的。在 Streaming SQL 中,Time Window 主要做用是爲後續的 SQL 算子限定輸入的範圍,而在通常的 SQL 中,Window Funtion 自己就是一個 SQL 算子,裏面的 Window 其實起到一個 Partition 的做用。
在純 Streaming 系統當中,這種語義的差異影響不大,反而還會由於語法的一致性下降用戶的學習成本,可是在 TBSSQL 這種 Batch / Streaming 混合場景下,同一套語法支持兩種語義,會對用戶的使用形成必定困擾,特別是在 TiDB 已經被衆多用戶應用到生產環境這種背景下,這種語義上的差異必定要體如今語法的差別上。
DDL 這一塊實現難度不大,只要照着 DDL源碼解析 依葫蘆畫瓢就行。這裏值得一提的是在 Meta 層,咱們直接(偷懶)複用了 TableInfo 結構(加了判斷是否爲 Streaming 的 Flag 和一些表示 Streaming 屬性的字段)來表示 Streaming Table。這個選擇主要是從實現難度上考慮的,畢竟複用現有的結構是最快最安全的。可是從設計思想上看,這個決定其實也暗示了在 TBSSQL 當中,Streaming 是 Table 的一種特殊形式,而不是一個獨立的概念。理解這一點很重要,由於這是一些其餘設計的依據。好比按照以上設定,那麼從語義上講,在同一個 DB 下 Streaming 和普通 Table 就不能重名,反之的話這種重名就是能夠接受的。
這一塊主要有兩個部分,一個是適配不一樣的數據源(collector),另外一個是將 Streaming 數據源引入 TiDB 計算引擎(StreamReader)。collector 這部分上面已經介紹過了,這裏就再也不過多介紹了。StreamReader 這一塊,主要要修改由 LogicalPlan 生成 PhysicalPlan(具體代碼),以及由 PhysicalPlan 生成 Executor Operator Tree 的過程(具體代碼)。StreamReader 的 Open 方法中,會利用 Meta 中的各類元信息來初始化與 collector 之間的鏈接,而後在 Next 方法中經過 Pull 的方式不斷拉取數據。
對時間窗口的處理
前面咱們提到,時間窗口是 Streaming 系統中的核心概念。那麼這裏就有一個重要的問題,Time Window 中的 Time 如何界定?如何判斷何時應該切換 Window?最容易想到,也是最簡單粗暴的方式,就是按照系統的當前時間來進行切割。這種方式問題很大,由於:
所以,比較合理的方式是以 Streaming 中的某一 Timestamp 類型的列來切分窗口,這個值由用戶在應用層來指定。固然 Streaming 的 Schema 中可能有多個 Timestamp 列,這裏能夠要求用戶指定一個做爲 Window 列。在實現 Demo 的時候,爲了省事,咱們直接限定了用戶 Schema 中只能有一個時間列,而且以該列做爲 Window 列([具體代碼](https://github.com/qiuyesuifeng/tidb/blob/656971da00a3b1f81f5085aaa277159868fca223/ddl/table.go#L58))。固然這裏帶來一個問題,就是 Streaming 的 Schema 中必須有 Timestamp 列,否則這裏就無法玩了。爲此,咱們在建立 Streaming 的 DDL 中加了 [檢查邏輯](https://github.com/qiuyesuifeng/tidb/blob/656971da00a3b1f81f5085aaa277159868fca223/ddl/ddl_api.go#L149),強制 Streaming 的 Schema 必須有 Timestamp 列(其實咱們也沒想明白當初 Hackathon 爲啥要寫的這麼細,這些細節爲後來通宵埋下了濃重的伏筆,只能理解爲程序猿的本能,但願這些代碼你們看的時候吐槽少一些)。
這裏簡單 DML 指的就是不依賴時間窗口的 DML,好比說只帶 Selection 和 Projection 的SELECT 語句,或者單個 Streaming Join 多個 Table。由於不依賴時間窗口,支持這類 DML 實際上不須要對計算層作任何改動,只要接入 Streaming 數據源就能夠了。
對於 Streaming Join Table(如上圖表示的是 Stream Join User&Ads 表的示意圖) 能夠多說一點,若是不帶 Time Window,其實這裏須要修改一下Planner。由於 Streaming 的流式特性,這裏可能無法獲取其完整輸入集,所以就無法對 Streaming 的整個輸入進行排序,因此 Merge Join 算法這裏就無法使用了。同理,也沒法基於 Streaming 的整個輸入建 Hash 表,所以在 Hash Join 算法當中也只能某個普通表 Build Hash Table。不過,在咱們的 Demo 階段,輸入其實也是仍是有限的,因此這裏其實沒有作,倒也影響不大。
在 TBSSQL 當中,咱們實現了基於固定時間窗的 Hash Aggregation Operator 和 Sort Operator。這裏比較正規的打法其實應該是實現一個獨立的 TimeWindow,各類基於時間窗口的 Operator 能夠切換時間窗的邏輯,而後好比 Aggregation 和 Sort 這類算子只關心本身的計算邏輯。 可是這樣一來要對 Planner 作比較大的改動,想一想看難度太大了,因此咱們再一次採起了直(tou)接(lan)的方法,將時間窗口直接實現分別實如今 Aggregation 和 Sort 內部,這樣 Planner 這塊不用作傷筋動骨的改動,只要在各個分支邏輯上修修補補就能夠了。
對於 Aggregation,咱們還作了一些額外的修改。Aggregation 的輸出 Schema 語義上來講只包括聚合列和聚合算子的輸出列。可是在引入時間窗口的狀況下,爲了區分不一樣的窗口的聚合輸出,咱們爲聚合結果顯式加上了兩個 Timestamp 列 window_start
和 window_end
, 來表示窗口的開始時間和結束時間。爲了此次這個小特性,咱們踩到一個大坑,費了很多勁,這個後面再仔細聊聊。
由於 TiDB 自己目前還暫時不支持 CREATE TABLE AS SELECT … 語法,而從頭開始搞的話工做量又太大,所以咱們一度打算放棄這個 Feature。後面通過老司機提醒,咱們發現社區的小夥伴王聰(bb7133)已經提了一個 PR 在作這個事情了。本着試一把的想法咱們把這個 PR 合到咱們的分支上一跑,結果居然沒多少衝突,還真能 Work…...稍微有點問題的是若是 SELECT 子句中有帶時間窗口的聚合,輸出的結果不太對。仔細研究了一下發現,CREATE TABLE AS SELECT 語句中作 LogicalPlan 的路徑和直接執行 SELECT 時作 LogicalPlan 的入口不太一致,以致於對於前者,咱們作 LogicalPlan 的時候遺漏了一些 Streaming 相關信息。這裏稍做修改之後,也可以正常運行了。
本着前人採坑,後人儘可能少踩的心態聊聊遇到的一些問題,主要的技術方案上面已經介紹的比較多了。限於篇幅,只描述遇到的最大的坑——消失的窗口列的故事。在作基於時間窗口的 Aggregation 的時候,咱們要按照用戶指定的窗口列來切窗口。可是根據 列裁剪 規則,若是這個窗口列沒有被用做聚合列或者在聚合函數中被使用,那麼這一列基本上會被優化器裁掉。這裏的修改很簡單(咱們覺得),只須要在聚合的列裁剪邏輯中,若是發現聚合帶時間窗口,那麼直接不作裁剪就完事兒了(代碼)。三下五除二修改完代碼,編譯完後一運行,結果……瞬間 Panic 了……Debug 一看,發現剛剛的修改沒有生效,Streaming 的窗口列仍是被裁剪掉了,隨後咱們又把 Planner 的主要流程看了一遍,仍是沒有在其餘地方發現有相似的裁剪邏輯。
這時咱們意識到事情沒有這麼簡單了,趕緊從導師團搬來老司機(仍是上面那位)。咱們一塊兒用簡單粗暴的二分大法和 Print 大法,在生成 LogicalPlan,PhysicalPlan 和 Executor 先後將各個算子的 Schema 打印出來。結果發現,在 PhysicalPlan 完成後,窗口列仍是存在的,也就是說咱們的修改是生效了的,可是在生成 Executor 之後,這一列卻神祕消失了。因此一開始咱們定位的思路就錯了,問題出在生成 Executor 的過程,可是咱們一直在 Planner 中定位,固然找不到問題。
明確了方向之後,咱們很快就發現了元兇。在 Build HashAggregation 的時候,有一個不起眼的函數調用 buildProjBelowAgg,這個函數悄悄地在 Aggregation 算子下面加塞了一個 Projection 算子,順道又作了一把列裁剪,最爲頭疼的是,由於這個 Projection 算子是在生成 Executor 階段才塞進去的,而 EXPLAIN 語句是走不到這裏來的,因此這個 Projection 算子在作 Explain 的時候是看不見的,想當因而一個隱形的算子,因此咱們就這樣華麗麗地被坑了,因而就有了羅伯特小姐姐聽到的那句 「xxx,出來捱打」 的橋段。
從立項之初,咱們就指望 TBSSQL 可以做爲一個正式的 Feature 投入生產環境。爲此,在設計和實現過程當中,若是能用比較優雅的解決方案,咱們都儘可能不 Hack。可是因爲時間緊迫和能力有限,目前 TBSSQL 仍是處於 Demo 的階段,離實現這個目標還有很長的路要走。
在對接 Streaming 數據源這塊,目前 TBSSQL 有兩個問題。首先,TBSSQL 默認輸入數據是按照窗口時間戳嚴格有序的。這一點在生產環境中並不必定成立(好比由於網絡緣由,某一段數據出現了亂序)。爲此,咱們須要引入相似 Google MillWheel 系統中 Low Watermark 的機制來保證數據的有序性。其次,爲了保證有序,目前 StreamReader 只能單線程運行。在實際生產環境當中,這裏極可能由於數據消費速度趕不上上游數據生產速度,致使上游數據源的堆積,這又會反過來致使產生計算結果的時間和數據生產時間之間的延遲愈來愈大。爲了解決這個問題,咱們須要將 StreamReader 並行化,而這又要求基於時間窗口的計算算子可以對多路數據進行歸併排序。另外,目前採用 TiDB Global Variable 來模擬 Streaming 的位置信息,其實更好地方案是設計用一個 TiDB Table 來記錄每一個不一樣 StreamReader 讀取到的數據位置,這種作法更標準。
在 Planner 這塊,從前面的方案介紹能夠看出,Streaming 的流式特性和時序特性決定了 Streaming SQL 的優化方式和通常 SQL 有所不一樣。目前 TBSSQL 的實現方式是在現有 Planner 的執行路徑上加上一系列針對 Streaming SQL 的特殊分支。這種作法很不優雅,既難以理解,也難以擴展。目前,TiDB 正在基於 Cascade 重構 Planner 架構,咱們但願從此 Streaming SQL 的相關優化也基於新的 Planner 框架來完成。
目前,TBSSQL 只實現了最簡單的固定窗口。在固定窗口上,Aggregation、Sort 等算子很大程度能複用現有邏輯。可是在滑動窗口上,Aggregation、Sort 的計算方式和在 Batch Table 上的計算方式會徹底不同。從此,咱們但願 TBSSQL 可以支持完善對各類時間窗口類型的支持。
目前 TBSSQL 只能處理單路 Streaming 輸入,好比單個 Streaming 的聚合,排序,以及單個Streaming 和多個 Table 之間的 Join。多個 Streaming 之間的 Join 由於涉及多個 Streaming 窗口的對齊,目前 TBSSQL 暫不支持,因此 TBSSQL 目前並非一個完整的 Streaming SQL 引擎。咱們計劃從此對這一塊加以完善。
TBSSQL 是一個複雜的工程,要實現 Batch/Streaming 的融合,除了以上提到這四點,TBSSQL 還有頗有不少工做要作,這裏就不一一詳述了。或許,下次 Hackathon 能夠再繼續搞一把 TBSSQL 2.0 玩玩:) 有點遺憾的是做爲選手出場,沒有和全部優秀的參賽的小夥伴們暢談交流,但願有機會能夠補上。屬於你們的青春不散場,TiDB Hackathon 2019,不見不散~~