做者:楊克特/伍翀 整理:徐前進(Apache Flink Contributor) 校對:楊克特/伍翀html
本文整理自12月20日在北京舉行的Flink-forward-china-2018大會, 分享嘉賓楊克特:花名魯尼,Apache Flink Committer,2011年碩士畢業加入阿里,參與多個計算平臺核心項目的設計和研發,包括搜索引擎、調度系統、監控分析等。目前負責Blink SQL的研發和優化。 分享嘉賓伍翀:花名雲邪,Apache Flink Committer,從Flink v1.0就參與貢獻,從事Flink Table & SQL相關工做已有三年,目前在阿里Blink SQL項目組。數據庫
文章概述:基於Flink以流爲本的計算引擎去構建一個流與批統一的解決方案 本文主要從如下5個方面來介紹基於Flink Streaming構建統一的數據處理引擎的挑戰和實踐。apache
爲何要統一批和流數組
什麼是統一的SQL處理引擎緩存
如何統一批和流性能優化
性能表現網絡
將來計劃數據結構
通常公司裏面都會有一個比較傳統的批處理系統天天去算一些報表,隨着愈來愈多更實時的需求,你們也許會採用 Storm、Flink、Spark 來作流計算,同時又會在邊上跑一個批處理,以小時或天的粒度去計算一個結果,來實現兩邊的校驗。這個就是經典的 Lambda 架構。架構
可是這個架構有不少問題,好比用戶須要學習兩套引擎的開發方式,運維須要同時維護兩套系統。更重要的問題是,咱們須要維護兩套流程,一套增量流程,一套全量流程,同時這兩套流程之間必需要有必定的自洽性,它們必需要保證一致。當業務變得愈來愈複雜的時候,這種一致性自己也會成爲一個挑戰。框架
這也是 Flink SQL 但願解決的問題,但願經過 Flink SQL,無論是開發人員仍是運維人員,只須要學習一套引擎,就能解決各類大數據的問題。也不只限於批處理或者說流計算,甚至能夠更多。例如:支持高時效的批處理達到 OLAP 的效果,直接用SQL語法去作復瑣事件處理(CEP),使用 Table API 或 SQL 來支持機器學習,等等。在 SQL 上還有很是多的想象空間。
那麼什麼是統一的 SQL 處理引擎呢?統一的 SQL 引擎道路上有哪些挑戰呢?
從用戶的角度來講,一句話來描述就是「一份代碼,一個結果」。也就是隻須要寫一份代碼,流和批跑出來的最終結果是同樣。這個的好處是,用戶不再用去保證增量流程和全量流程之間的一致性了,這個一致性將由 Flink 來保證。
從開發的角度來看,其實更加關注底層架構的統一,好比說一些技術模塊是否是足夠通用,流和批模式下是否能作到儘量地複用。精心設計的高效數據結構是否是能夠普遍地應用在引擎的不少模塊中。
首先咱們有一張用戶的分數表 USER_SCORES
,裏面有用戶的名字、得分和得分時間。經過這張表來作一個很是簡單的統計,統計每一個用戶的總分,以及獲得這個分數的最近時間是多少? 從流計算跟批處理的角度,無論是作一張離線報表,仍是實時不斷地產出計算結果,它們的SQL是徹底如出一轍的,就是一個簡單的GroupBy分組,求和,求max。
如上圖有一張源數據表,有名字,分數,事件時間。
對於批處理,經過這樣一個SQL能夠直接拿到最終的結果,結果只會顯示一次,由於在數據消費完以後,纔會輸出結果。
對於流處理,SQL 也是如出一轍的。假設流任務是從 12:01 開始運行的,這時候尚未收到任何消息,因此它什麼結果都沒輸出。隨着時間推移,收到了第一條 Julie 的得分消息,此時會輸出 Julie 7 12:01
。當到達12:04分時,輸出的結果會更新成 Julie 8 12:03
,由於又收到了一條 Julie 的得分消息。這個結果對於最終的結果來說多是不對的,可是至少在 12:04 這一刻,它是一個正確的結果。當時間推動到當前時間(假設是 12:08 分),全部已產生的消息都已消費完了,能夠看到這時候的輸出結果和批處理的結果是如出一轍的。
這就是「一份代碼,一個結果」。其實從用戶的角度來說,流計算跟批處理在結果正確性上並無區別,只是在結果的時效性上有一些區別。
對於開發人員來講,引擎的統一又是意味着什麼?這張圖是目前Flink的架構圖,最上層 Table API 與 SQL。在執行以前,會根據執行環境翻譯成 DataSet 或 DataStream 的 API。這兩個 API 之間仍是有比較大的不一樣,咱們能夠放大以後看看。好比 DataSet API 是Flink批處理的API,它本身有一個優化器。 可是在DataStream API下,就是一些比較直觀簡單的翻譯。而後在運行時,他們也是依託於不一樣的task。在 DataStream 這邊,主要是運行Stream Task,同時在裏面會運行各類各樣的operator。 在批處理這邊主要是運行 Batch Task 和 Driver。這個主要是執行模式的區別。
在代碼上能有多少能共用的呢?好比說要實現一個 INNER JOIN,以如今的代碼距離,若是要在流上實現 INNER JOIN,首先會把兩路輸入變成兩個DataStream,而後把兩個輸入 connect 起來,再進行 keyBy join key,最後實現一個 KeyedCoProcessOperator
來完成 join 的操做。可是在 DataSet 這邊,你會發現 API 就不太同樣了,緣由是 DataSet 底下是有個優化器的。換句話說,DataSet 的 API 有些是聲明式的,DataStream 的 API 是命令式的。從這個例子上來看,對於咱們開發人員來講,在流計算或者是在批處理下實現 JOIN 所面對的 API 實際上是比較不同的,因此這也很難讓咱們去複用一些代碼,甚至是設計上的複用。這個是API 上的區別,到了Runtime以後,比較大的不一樣是 Stream Task 跟 Batch Task 的區別。
在一個經典的pull的模式下,首先會有執行器開始向執行的,能夠理解爲程序入口,它會向最後一個節點請求最終的結果。最後一個節點(求和節點)會向前一個節點(過濾節點)請求數據,而後再向前一個節點請求數據直到源節點。源節點就會本身去把數據讀出來,而後一層層往下傳遞,最終最後一個節點計算完求和後返回給程序入口。這和函數調用棧很是相像。
在 push 模式下,在程序開始執行的時候,咱們會直接找到 DAG 的頭節點,全部的數據和控制消息都由這個頭節點往下發送,控制流會跟數據流一塊兒,至關於它同時作一個函數調用,而且把數據發送給下一個算子,最終達到求和的算子。
經過這個簡單例子,你們能夠體會一下,這兩個在執行模式上有很大的不一樣,這會在 runtime 統一上帶來不少問題,但其實他們完成的功能是相似的。
咱們在深度統一流和批的道路上遇到了這麼多挑戰,那麼是如何作到統一的呢?
首先,你們已經愈來愈認同 SQL 是大數據處理的通用語言,不只僅是由於 SQL 是一個很是易於表達的語言,還由於 SQL 是一個很是適合於流批統一的語言。可是在傳統的 SQL 裏面,SQL 是一直做用在「表」上的,不是做用在「流」上的。怎麼樣讓 SQL 可以做用在流上,並且讓流式 SQL 的語義、結果和批同樣呢?這是咱們遇到的第一個問題。爲了解決這個問題,咱們和社區提出了「流表對偶性」還有「動態表」的這兩個理論基礎。在這裏,這個理論基礎咱們這裏就不展開了,感興趣的同窗能夠去官網上閱讀下這篇文章。你們只須要知道只有在基於這兩個理論的基礎上,流式SQL的語義纔可以保證和批的語義是同樣的,結果是同樣的。
如圖是對架構上的一些改進。架構的改進主要集中在中間兩層,在 Runtime 層咱們加強了現有的 Operator 框架,使得能支持批算子。在 Runtime 之上,咱們提出了一個 Query Processor 層,包括查詢優化和查詢執行,Table API & SQL 再也不翻譯到 DataStream 和 DataSet ,而是架在 Query Processor 之上。
在 Runtime 層,首先實現的是 Runtime DAG 層的統一,基於統一的DAG層之上,再去構建流的算子和批的算子。爲了統一流和批的最底層的API,引入了一個統一的Operator層抽象。批算子的實現再也不基於 DataSet 底層 Deriver 接口實現,都基於 StreamOperator 接口來實現了。這樣流和批都使用了統一的 Operator API 來實現。
除此以外,針對批的場景,咱們對 Operator 框架作了些擴展使得批能得到額外的優化。
第一點是 Operator 能夠自主的選擇輸入邊,例如hashjoin,批的hashjoin通常先會把build端處理完,把哈希表先build起來,而後再去處理另一邊的probe端。
第二點是更加靈活的 Chaining 的策略。StreamOperator 的默認 Chaining 策略只能將單輸入的 operator chain在一塊兒。可是在批的一些場景,咱們但願可以對多輸入的Operator也進行Chaining。好比說兩個 Join Operator,咱們但願也可以 Chaining 在一塊兒,這樣這兩個 Join Operator 之間的數據shuffle 就能夠省掉。
關於統一的 Operator 框架,咱們已經在社區裏面展開了討論,感興趣的同窗能夠關注一下這個討論連接。
而後講一下統一的Query Processor 層,無論是流計算仍是批處理的SQL,他們都將沿着相同的解析和優化的路徑往前走。在解析層,也就是將 SQL 和 Table API 代碼解析成邏輯計劃,這裏流和批完徹底全複用了同樣的代碼。而後在優化層流和批也使用了相同的優化器來實現,在優化器裏面,全部的優化的規則都是可插拔的。流和批共用了絕大部分的優化的規則,只有少部分的規則是流特有的,或者是批特有的。而後在優化以後,獲得了一個物理計劃,物理的計劃會通過翻譯成最終的Execution DAG,也就是咱們剛剛講的Stream Operator算子,最後會分佈式地運行起來。
在優化器這一層,很符合二八定律,也就是80%的優化規則都是流和批是共享的,好比說列裁剪、分區裁剪、條件下推等等這些都是共享的。還有20%的優化規則是流批特有的,通過咱們的研究發現比較有意思的一個規律。
批這邊優化規則,不少都是跟sort相關相關的,由於流如今不支持sort,因此 sort 能夠理解是批特有的,好比說一些sort merge join 的優化、sort agg的優化。
而流這邊所特有的一些規則,都是跟state相關的。爲何呢?由於目前流做業在生產中跑一個 SQL 的做業,通常會選擇使用 RocksDB 的 StateBackend。RocksDB StateBackend,它有一個很大的瓶頸,就是你每一次的操做,都會涉及到序列化和反序列化,因此說 State 操做會成爲一個流做業的瓶頸。因此如何去優化一個流做業,不少時候是思考如何節省State操做,如何減少State的size,如何避免存儲重複的State數據。這些都是目前流計算任務優化的立竿見影的方向。
這裏介紹一個流和批共用的高級一點的優化規則。你們能夠先看一下上圖左邊這個query,這是一個通過簡化以後的TBCH13的query,有一張用戶表customer,還有一張訂單表 orders,customer 表會根據 custkey 去 join 上 orders 這張表,而後 join 以後,再根據 custkey 來進行分組,統計出現過的訂單的數量。
梳理一下就是要統計每一個客戶下的訂單數,可是訂單的數據是存在orders表裏面的,因此就須要去join這個orders表。這個query通過解析以後,獲得的邏輯計劃就是中間這個圖。能夠看到customer表和orders表進行了join,join以後作了一個agg。可是這裏有一個問題,就是customer表和orders表都是兩張很是大的表,都是上億級別的。在批處理下,爲這個join去build哈希表的時候,要用到大量的buffer,甚至還須要落盤,這就可能致使這個join性能比較差。 在流處理下也是相似的,須要把customer表和orders表全部的數據都存到state裏面去。state越大,流處理性能也就越差。
因此說怎樣去節省和避免數據量是這個查詢優化的方向。咱們注意到customer它自己就帶了一個主鍵就是custkey,最後的agg也是針對 custkey 進行聚合統計的。那麼其實咱們能夠先對orders表作一個聚合統計,先統計出每一個用戶每一個custkey它下一個多少的訂單,而後再和customer表作一個join,也就是說把agg進行下推,下推到了join以前,這樣子,orders表就從原來的15億的數據量壓縮到了一個億,而後再進行join。這個對於流和批都是巨大的性能優化。咱們在流場景下測試發現從原先耗時六個小時提高到了14分鐘。
講這個例子目的是想說明 SQL 已經發展了幾十年了,有很是多的牛人在這個領域耕耘多年,已經有了很是多成熟的優化。這些優化,基於流批統一的模型,不少事能夠直接拿過來給流用的。咱們不須要再爲流在開發、研究一套優化規則,作到事半功倍的效果。
原先在 Flink 中無論流仍是批,具體幹活的算子之間傳遞的都是一種叫 Row 的數據結構。 可是Row有這麼幾個比較典型的問題。
Row結構很簡單,裏面就存了一個Object數組,好比說如今有一行數據,第一個是整形,另兩個是字符串。那麼row裏面就會有一個Int,還有兩個String。可是咱們知道Java在對象上,它會有一些額外的空間的一些開銷。
另外對於主類型的訪問,會有裝箱和拆箱的開銷。
還有在算Row的hashcode、序列化、反序列化時,須要去迭代Row裏面數組的每個元素的hashcode方法、反序列化方法、序列化方法,這就會涉及到不少額外的虛函數調用的開銷。
最後一點是對於一些稍微高級一點的數據結構,好比說排序器 sort,還有agg join中的一些hashtable,hashmap的這種二進制的數據結構,基於Row的這種封裝,很難去作到極致的效率。
因此針對這些問題,咱們也提出了一個全新的數據結構BinaryRow,而後它是徹底基於二進制的結構來設計的。BinaryRow 分紅了定長區和變長區,在定長區開頭是一個null bit的一個區間,用來記錄每一個字段是不是null值。而後像int,long,double這種定長的數據類型,咱們會直接把這個直接存在定長區裏面,而後string這種變長形的數據,咱們會把他的變長的數據存在變長區,而後把他的指針還有他的長度存在定長區。在存放數據的時候,BinaryRow 中每個數據塊都是八字節對齊的。 爲何八字節對齊?一方面是爲了更快的 random access,查找字段時不須要從頭遍歷,直接就能定位到字段的位置。另外一方面是可以作到更好的cpu的緩存。
BinaryRow 有一個比較重要的優勢:延遲反序列化。例如從網絡過來的二進制數據、從state拿到的二進制數據,不會立刻反序列化出來,而是會 wrap 成 BinaryRow,當須要的時候才進行反序列化,這能節省不少沒必要要的反序列化操做,從而提高性能。通過測試,這個數據結構不只在批處理中表現的很是優秀,在流處理中也獲得了一倍的性能提高。
維表關聯,你們應該都比較瞭解,就是一個流要去join一個存在外部的數據庫。咱們會拿流數據的 ID 去 lookup 維表,這個lookup的過程,咱們會實現成同步的模式或者是異步的模式。 咱們知道 DataStream 上支持了異步IO接口,可是DataSet是沒有的。不過因爲咱們統一了Operator層,因此說批能夠直接複用流的 operator 實現。雖然在傳統的批處理中,若是要查詢維表,會先把維表scan下來再作 JOIN。但若是說維表特別大,probe端特別小,這樣多是不划算的,使用lookup的方式可能會更高效一些,因此說這也是彌補了批在某些場景的一個短板。
爲了不對state的頻繁操做,咱們在流上引入了Micro-Batch 機制。實現方式就是在數據流中插入了一些 micro-batch 的事件。而後在Aggregate的Operator裏面,收到數據的時候,咱們就會把它存到或直接聚合到二進制的哈希表裏面(緩存到內存中)。而後當收到 micro-batch 事件的時候,再去觸發二進制的映射表(BinaryHashMap),將緩存的結果刷到 state 中,並將輸出最終結果。 這裏 BinaryHashMap 是徹底和批這邊複用的。流這邊沒有去從新造一套,在性能上也獲得了十倍的提高。
咱們先測試了一個批的性能,拿的是TPC-H去作一個測試。咱們與Flink1.6.0進行了比較,這個圖是在1T的數據量下每一個query的一個耗時的對比,因此說耗時越小,它的性能也就越好。能夠看出每個query,Blink都會比Flink1.6 要優秀不少。 平均性能上要比Flink1.6要快十倍。另外借助統一的架構,流也成功的攻克了全部的TBCH的query。值得一提的是,這是目前其餘引擎作不到的。還有在今年的天貓雙11上流計算,達到了17億的TPS這麼大的一個吞吐量。能達到這麼高的性能表現,離不開咱們今天聊的統一流批架構。
咱們會繼續探索流和批的一些結合,由於流和批並非非黑即白的,不是說批就是批做業,流就是流做業,流和批之間還有不少比較大的空間值得咱們去探索。好比說一個做業,他可能一部分是一個一直運行的流做業,另外一部分是一個間隔調度的批做業,他們之間是融合運行着的。再好比一個批做業運行完以後,怎麼樣可以無縫地把它遷移成一個流做業,這些都是咱們將來嘗試去作的一些研究的方向。
更多資訊請訪問 Apache Flink 中文社區網站