簡介:《實時數倉入門訓練營》由阿里雲研究員王峯、阿里雲高級產品專家劉一鳴等實時計算 Flink 版和 Hologres 的多名技術/產品一線專家齊上陣,協力搭建這次訓練營的課程體系,精心打磨課程內容,直擊當下同窗們所遇到的痛點問題。由淺入深全方位解析實時數倉的架構、場景、以及實操應用,7 門精品課程幫助你 5 天時間從小白成長爲大牛!
本文整理自直播《實時計算 Flink 版 SQL 實踐-李麟(海豹)》
視頻連接:https://developer.aliyun.com/learning/course/807/detail/13887數據庫
內容簡要:
1、實時計算Flink版SQL簡介
2、實時計算Flink版SQL上手示例
3、開發常見問題和解法
實時計算Flink版選擇了SQL這種聲明式語言做爲頂層API,比較穩定,也方便用戶使用。Flink SQL具有流批統一的特性,給用戶統一的開發體驗,而且語義一致。另外,Flink SQL可以自動優化,包括屏蔽流計算裏面State的複雜性,也提供了自動優化的Plan,而且還集成了AutoPilot自動調優的功能。Flink SQL的應用場景也比較普遍,包括數據集成、實時報表、實時風控,還有在線機器學習等場景。segmentfault
在基本操做上,能夠看到SQL的語法和標準SQL很是相似。示例中包括了基本的SELECT、FILTER操做。,可使用內置函數,如日期的格式化,也可使用自定義函數,好比示例中的匯率轉換就是一個用戶自定義函數,在平臺上註冊後就能夠直接使用。架構
在實際的數據處理過程當中,維表的Lookup Join也是一個比較常見的例子。 併發
這裏展現的是一個維表INNER JOIN示例。運維
例子中顯示的SOURCE表是一個實時變化的訂單信息表,它經過INNER JOIN去關聯維表信息,這裏標黃高亮的就是維表JOIN的語法,能夠看到它和傳統的批處理有一個寫法上的差別,多了FOR SYSTEM\_TIME AS OF這個子句來標明它是一個維表JOIN的操做。SOURCE表每來一條訂單消息,它都會觸發維表算子,去作一次對維表信息的查詢,因此把它叫作一個Lookup Join。dom
Window Aggregation(窗口聚合)操做也是常見的操做,Flink SQL中內置支持了幾種經常使用的Window類型,好比Tumble Window,Session Window,Hop Window,還有新引入的Cumulate Window。機器學習
Tumbleide
Tumble Window能夠理解成固定大小的時間窗口,也叫滾窗,好比說5分鐘、10分鐘或者1個小時的固定間隔的窗口,窗口之間沒有重疊。函數
Session工具
Session Window(會話窗口) 定義了一個連續事件的範圍,窗口定義中的一個參數叫作Session Gap,表示兩條數據的間隔若是超過定義的時長,那麼前一個Window就結束了,同時生成了一個新的窗口。
Hop
Hop Window不一樣於滾動窗口的窗口不重疊,滑動窗口的窗口之間能夠重疊。滑動窗口有兩個參數:size 和 slide。size 爲窗口的大小,slide 爲每次滑動的步長。若是slide < size,則窗口會重疊,同一條數據可能會被分配到多個窗口;若是 slide = size,則等同於 Tumble Window。若是 slide > size,窗口之間沒有重疊且有間隙。
Cumulate
Cumulate Window(累積窗口),是Flink社區1.13版本里新引入的,能夠對比 Hop Window來理解,區別是從Window Start開始不斷去累積。示例中Window 一、Window 二、Window 3是在不斷地增加的。它有一個最大的窗口長度,好比咱們定義Window Size是一天,而後Step步長是1個小時,那麼它會在一天中的每一個小時產生累積到當前小時的聚合結果。
看一個具體的Window聚合處理示例。
如上圖所示,好比說須要進行每5分鐘單個用戶的點擊數統計。
源數據是用戶的點擊日誌,咱們指望算出每5分鐘單個用戶的點擊總數, SQL 中使用的是社區最新的 WindowTVF語法,先對源表開窗,再 GROUP BY 窗口對應的屬性 window\_start和window\_end, COUNT(*)就是點擊數統計。
能夠看到,當處理12:00到12:04的數據,有2個用戶產生了4次點擊,分別能統計出來用戶Mary是3次,Bob是1次。在接下來一批數據裏面,又來了3條數據,對應地更新到下一個窗口中,分別是1次和2次。
相對於Window Aggregation來講,Group Aggregation直接觸發計算,並不須要等到窗口結束,適用的一個場景是計算累積值。
上圖的例子是單個用戶累積到當前的點擊數統計。從Query上看,寫法相對簡單一點,直接 GROUP BY user 去計算COUNT(*),就是累積計數。
能夠看到,在結果上和Window的輸出是有差別的,在與Window相同的前4條輸入數據,Group Aggregation輸出的結果是Mary的點擊數已更新到3次,具體的計算過程多是從1變成2再變成3,Bob是1次,隨着後面3條數據的輸入,Bob對應的點擊數又會更新成2次,對結果是持續更新的過程,這和Window的計算場景是有一些區別的。
以前Window窗口裏面輸出的數據,在窗口結束後結果就不會再改變,而在Group Aggregation裏,同一個Group Key的結果是會產生持續更新的。
更全面地對比一下Window和Group Aggregation的一些區別。
Window Aggregation在輸出模式上是按時輸出,是在定義的數據到期以後它纔會輸出。好比定義5分鐘的窗口,結果是延遲輸出的,好比00:00~00:05這個時間段,它會等整個窗口數據都到齊以後,才完整輸出出來,而且結果只輸出一次,不會再改變。
Group Aggregation是數據觸發,好比第一條數據來它就會輸出結果,同一個Key 的第二條數據來結果會更新,因此在輸出流的性質上二者也是不同的。Window Aggregation通常狀況下輸出的是Append Stream,而在Group Aggregation輸出的是Update Stream。
在狀態State處理上二者的差別也比較大。Window Aggregation會自動清理過時數據,用戶就不須要額外再去關注 State的膨脹狀況。Group Aggregation是基於無限的狀態去作累積,因此須要用戶根據本身的計算場景來定義State的TTL,就是State保存多久。
好比統計一天內累計的PV和UV,不考慮數據延遲的狀況,也至少要保證State的TTL要大於等於一天,這樣才能保證計算的精確性。若是State的TTL定義成半天,統計值就可能不許確了。
對輸出的存儲要求也是由輸出流的性質來決定的。在Window的輸出上,由於它是Append流,全部的類型都是能夠對接輸出的。而Group Aggregatio輸出了更新流,因此要求目標存儲支持更新,能夠用Hologres、MySQL或者HBase這些支持更新的存儲。
下面經過具體的例子來看每一種SQL操做在真實的業務場景中會怎麼使用,好比SQL基本的語法操做,包括一些常見的Aggregation的使用。
這裏的例子是電商交易數據場景,模擬了實時數倉裏分層數據處理的狀況。
在數據接入層,咱們模擬了電商的交易訂單數據,它包括了訂單ID,商品ID,用戶ID,交易金額,商品的葉子類目,交易時間等基本信息,這是一個簡化的表。
示例1會從接入層到數據明細層,完成一個數據清洗工做,此外還會作類目信息的關聯,而後數據的彙總層咱們會演示怎麼完成分鐘級的成交統計、小時級口徑怎麼作實時成交統計,最後會介紹下在天級累積的成交場景上,怎麼去作準實時統計。
- 示例環境:內測版
演示環境是目前內測版的實時計算Flink產品,在這個平臺能夠直接作一站式的做業開發,包括調試,還有線上的運維工做。
- 接入層數據
使用 SQL DataGen Connector 生成模擬電商交易數據。
接入層數據:爲了方便演示,簡化了鏈路,用內置的SQL DataGen Connector來模擬電商數據的產生。
這裏面order\_id是設計了一個自增序列,Connector的參數沒有完整貼出來。 DataGen Connector支持幾種生成模式,好比能夠用Sequence產生自增序列,Random模式能夠模擬隨機值,這裏根據不一樣的字段業務含義,選擇了不一樣的生成策略。
好比order\_id是自增的,商品ID是隨機選取了1~10萬,用戶ID是1~1000萬,交易金額用分作單位, cate\_id是葉子類目ID,這裏共模擬100個葉子類目,直接經過計算列對商品ID取餘來生成,訂單建立時間使用當前時間模擬,這樣就能夠在開發平臺上調試,而不須要去建立Kafka或者DataHub作接入層的模擬。
- 電商交易數據-訂單過濾
這是一個數據清洗的場景,好比須要完成業務上的訂單過濾,業務方可能會對交易金額有最大最小的異常過濾,好比要大於1元,小於1萬才保留爲有效數據。
交易的建立時間是選取某個時刻以後的,經過WHERE條件組合過濾,就能夠完成這個邏輯。
真實的業務場景可能會複雜不少,下面來看下SQL如何運行。
這是使用調試模式,在平臺上點擊運行按鈕進行本地調試,能夠看到金額這一列被過濾,訂單建立時間也都是大於要求的時間值。
從這個簡單的清洗場景能夠看到,實時和傳統的批處理相比,在寫法上包括輸出結果差別並不大,流做業主要的差別是運行起來以後是長週期保持運行的,而不像傳統批處理,處理完數據以後就結束了。
接下來看一下怎麼作維表關聯。
根據剛纔接入層的訂單數據,由於原始數據裏面是葉子類目信息,在業務上須要關聯類目的維度表,維度表裏面記錄了葉子類目到一級類目的關聯關係,ID和名稱,清洗過程須要完成的目標是用原始表裏面葉子類目ID去關聯維表,補齊一級類目的ID和Name。這裏經過INNER JOIN維表的寫法,關聯以後把維表對應的字段選出來。
和批處理的寫法差別僅僅在於維表的特殊語法FOR SYSTEM\_TIME AS OF。
如上所示,平臺上能夠上傳本身的數據用於調試,好比這裏使用了1個CSV的測試數據,把100個葉子類目映射到10個一級類目上。
對應葉子類目ID的個位數就是它一級類目的ID,會關聯到對應的一級類目信息,返回它的名稱。本地調試運行優勢是速度比較快,能夠即時看到結果。在本地調試模式中,終端收到1000條數據以後,會自動暫停,防止結果過大而影響使用。
接下來咱們來看一下基於Window的統計。
第一個場景是分鐘級成交統計,這是在彙總層比較經常使用的計算邏輯。
分鐘級統計很容易想到Tumble Window,每一分鐘都是各算各的,須要計算幾個指標,包括總訂單數、總金額、成交商品數、成交用戶數等。成交的商品數和用戶數要作去重,因此在寫法上作了一個Distinct處理。
窗口是剛剛介紹過的Tumble Window,按照訂單建立時間去劃一分鐘的窗口,而後按一級類目的維度統計每一分鐘的成交狀況。
- 運行模式
上圖和剛纔的調試模式有點區別,上線以後就真正提交到集羣裏去運行一個做業,它的輸出採用了調試輸出,直接Print到Log裏。展開做業拓撲,能夠看到自動開啓了Local-Global的兩階段優化。
- 運行日誌 - 查看調試輸出結果
在運行一段時間以後,經過Task裏面的日誌能夠看到最終的輸出結果。
用的是Print Sink,會直接打到Log裏面。在真實場景的輸出上,好比寫到Hologres/MySQL,那就須要去對應存儲的數據庫上查看。
能夠看到,輸出的數據相對於數據的原始時間是存在必定滯後的。
在19:46:05的時候,輸出了19:45:00這一個窗口的數據,延遲了5秒鐘左右輸出前1分鐘的聚合結果。
這5秒鐘實際上和定義源表時WATERMARK的設定是有關係的,在聲明WATERMARK時是相對gmt\_create字段加了5秒的offset。這樣起到的效果是,當到達的最先數據是 19:46:00 時,咱們認爲水位線是到了19:45:55,這就是5秒的延遲效果,來實現對亂序數據的寬容處理。
第二個例子是作小時級實時成交統計。
如上圖所示,當要求實時統計,直接把Tumble Window開成1小時Size的Tumble Window,這樣能知足實時性嗎?按照剛纔展現的輸出結果,具備必定的延遲效果。所以開一個小時的窗口,必須等到這一個小時的數據都收到以後,在下一個小時的開始,才能輸出上一個小時的結果,延遲在小時級別的,知足不了實時性的要求。回顧以前介紹的 Group Aggregation 是能夠知足實時要求的。
具體來看,好比須要完成小時+類目以及只算小時的兩個口徑統計,兩個統計一塊兒作,在傳統批處理中經常使用的GROUPING SETS功能,在實時Flink上也是支持的。
咱們能夠直接GROUP BY GROUPING SETS,第一個是小時全口徑,第二個是類目+小時的統計口徑,而後計算它的訂單數,包括總金額,去重的商品數和用戶數。
這種寫法對結果加了空值轉換處理便於查看數據,就是對小時全口徑的統計,輸出的一級類目是空的,須要對它作一個空值轉換處理。
上方爲調試模式的運行過程,能夠看到Datagen生成的數據實時更新到一級類目和它對應的小時上。
這裏能夠看到,兩個不一樣GROUP BY的結果在一塊兒輸出,中間有一列ALL是經過空值轉換來的,這就是全口徑的統計值。本地調試相對來講比較直觀和方便,有興趣的話也能夠到阿里雲官網申請或購買進行體驗。
第三個示例是天級累計成交統計,業務要求是準實時,好比說可以接受分鐘級的更新延遲。
按照剛纔Group Aggregation小時的實時統計,容易聯想到直接把Query改爲天維度,就能夠實現這個需求,並且實時性比較高,數據觸發以後能夠達到秒級的更新。
回顧下以前提到的Window和Group Aggregation對於內置狀態處理上的區別,Window Aggregation能夠實現State的自動清理,Group Aggregation須要用戶本身去調整 TTL。因爲業務上是準實時的要求,在這裏能夠有一個替代的方案,好比用新引入的Cumulate Window作累積的Window計算,天級的累積而後使用分鐘級的步長,能夠實現每分鐘更新的準實時要求。
回顧一下Cumulate Window,如上所示。天級累積的話,Window的最大Size是到天,它的Window Step就是一分鐘,這樣就能夠表達天級的累積統計。
具體的Query如上,這裏使用新的TVF語法,經過一個TABLE關鍵字把Windows的定義包含在中間,而後 Cumulate Window引用輸入表,接着定義它的時間屬性,步長和size 參數。GROUP BY就是普通寫法,由於它有提早輸出,因此咱們把窗口的開始時間和結束時間一塊兒打印出來。
這個例子也經過線上運行的方式去看Log輸出。
- 運行模式
能夠看到,它和以前Tumble Window運行的結構相似,也是預聚合加上全局聚合,它和Tumble Window的區別就是並不須要等到這一天數據都到齊了才輸出結果。
- 運行日誌 – 觀察調試結果
從上方示例能夠看到,在20:47:00的時候,已經有00:00:00到20:47:00的結果累積,還有對應的4列統計值。下一個輸出就是接下來的累計窗口,能夠看到20:47:00到20:48:00就是一個累計的步長,這樣既知足了天級別的累計統計需求,也可以知足準實時的要求。
而後咱們來總體總結一下以上的示例。
在接入層到明細層的清洗處理特色是相對簡單,也比較明確,好比業務邏輯上須要作固定的過濾條件,包括維度的擴展,這都是很是明確和直接的。
從明細層到彙總層,例子中的分鐘級統計,咱們是用了Tumble Window,而小時級由於實時性的要求,換成了Group Aggregation,而後到天級累積分別展現Group Aggregation和新引入的Cumulate Window。
從彙總層的計算特色來講,咱們須要去關注業務上的實時性要求和數據準確性要求,而後根據實際狀況選擇Window聚合或者Group 聚合。
這裏爲何要提到數據準確性?
在一開始比較Window Aggregation和Group Aggregation的時候,提到Group Aggregation的實時性很是好,可是它的數據準確性是依賴於State的TTL,當統計的週期大於TTL,那麼TTL的數據可能會失真。
相反,在Window Aggregation上,對亂序的容忍度有一個上限,好比最多接受等一分鐘,但在實際的業務數據中,可能99%的數據能知足這樣的要求,還有1%的數據可能須要一個小時後纔來。基於WATERMARK的處理,默認它就是一個丟棄策略,超過了最大的offset的這些數據就會被丟棄,不歸入統計,此時數據也會失去它的準確性,因此這是一個相對的指標,須要根據具體的業務場景作選擇。
上方是實時計算真實業務接觸過程當中比較高頻的問題。
首先是實時計算不知道該如何下手,怎麼開始作實時計算,好比有些同窗有批處理的背景,而後剛開始接觸Flink SQL,不知道從哪開始。
另一類問題是SQL寫完了,也清楚輸入處理的數據量大概是什麼級別,可是不知道實時做業運行起來以後須要設定多大的資源
還有一類是SQL寫得比較複雜,這個時候要去作調試,好比要查爲何計算出的數據不符合預期等相似問題,許多同窗反映無從下手。
做業跑起來以後如何調優,這也是一個很是高頻的問題。
1.實時計算如何下手?
對於上手的問題,社區有不少官方的文檔,也提供了一些示例,你們能夠從簡單的例子上手,慢慢了解SQL裏面不一樣的算子,在流式計算的時候會有一些什麼樣的特性。
此外,還能夠關注開發者社區實時計算 Flink 版、 ververica.cn網站、 B 站的Apache Flink 公衆號等分享內容。
逐漸熟悉了SQL以後,若是想應用到生產環境中去解決真實的業務問題,阿里雲的行業解決方案裏也提供了一些典型的架構設計,能夠做爲參考。
2.複雜做業如何調試?
若是遇到千行級別的複雜SQL,即便對於Flink的開發同窗來也不能一目瞭然地把問題定位出來,其實仍是須要遵循由簡到繁的過程,可能須要藉助一些調試的工具,好比前面演示的平臺調試功能,而後作分段的驗證,把小段SQL局部的結果正確性調試完以後,再一步一步組裝起來,最終讓這個複雜做業能達到正確性的要求。
另外,能夠利用SQL語法上的特性,把SQL組織得更加清晰一點。實時計算Flink產品上有一個代碼結構功能,能夠比較方便地定位長SQL裏具體的語句,這都是一些輔助工具。
3.做業初始資源設置,如何調優?
咱們有一個經驗是根據輸入的數據,初始作小併發測試一下,看它的性能如何,而後再去估算。在大併發壓測的時候,按照需求的吞吐量,逐步逼近,而後拿到預期的性能配置,這個是比較直接但也比較可靠的方式。
調優這一塊主要是藉助於做業的運行是狀況,咱們會去關注一些重點指標,好比說有沒有產生數據的傾斜,維表的Lookup Join須要訪問外部存儲,有沒有產生IO的瓶頸,這都是影響做業性能的常見瓶頸點,須要加以關注。
在實時計算Flink產品上集成了一個叫AutoPilot的功能,能夠理解爲相似於自動駕駛,在這種功能下,初始資源設多少就不是一個麻煩問題了。
在產品上,設定做業最大的資源限制後,根據實際的數據處理量,該用多少資源能夠由引擎自動幫咱們去調到最優狀態,根據負載狀況來作伸縮。
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。