Flink 靠什麼征服餓了麼工程師?

平臺現狀

下面是目前餓了麼平臺現狀架構圖:java

clipboard.png

來源於多個數據源的數據寫到kafka裏,計算引擎主要是Storm,Spark和Flink,計算引擎出來的結果數據再落地到各類存儲上。python

目前Storm任務大概有100多個,Spark任務有50個左右,Flink暫時還比較少。mysql

目前咱們集羣規模天天數據量有60TB,計算次數有1000000000,節點有400個。這裏要提一下,Spark和Flink都是on yarn的,其中Flink onyarn主要是用做任務間jobmanager隔離, Storm是standalone模式。程序員

應用場景

1.一致性語義redis

在講述咱們應用場景以前,先強調實時計算一個重要概念, 一致性語義:sql

1) at-most-once:即fire and forget,咱們一般寫一個java的應用,不去考慮源頭的offset管理,也不去考慮下游的冪等性的話,就是簡單的at-most-once,數據來了,無論中間狀態怎樣,寫數據的狀態怎樣,也沒有ack機制。apache

2) at-least-once: 重發機制,重發數據保證每條數據至少處理一次。api

3) exactly-once: 使用粗Checkpoint粒度控制來實現exactly-once,咱們講的exactly-once大多數指計算引擎內的exactly-once,即每一步的operator內部的狀態是否能夠重放;上一次的job若是掛了,可否從上一次的狀態順利恢復,沒有涉及到輸出到sink的冪等性概念。性能優化

4) at-least-one + idempotent = exactly-one:若是咱們能保證說下游有冪等性的操做,好比基於mysql實現 update on duplicate key;或者你用es, cassandra之類的話,能夠經過主鍵key去實現upset的語義, 保證at-least-once的同時,再加上冪等性就是exactly-once。架構

2. Storm

餓了麼早期都是使用Storm,16年以前仍是Storm,17年纔開始有Sparkstreaming, Structed-streaming。Storm用的比較早,主要有下面幾個概念:

1) 數據是tuple-based

2) 毫秒級延遲

3) 主要支持java, 如今利用apache beam也支持python和go。

4) Sql的功能還不完備,咱們本身內部封裝了typhon,用戶只須要擴展咱們的一些接口,就可使用不少主要的功能;flux是Storm的一個比較好的工具,只須要寫一個yaml文件,就能夠描述一個Storm任務,某種程度上說知足了一些需求,但仍是要求用戶是會寫java的工程師,數據分析師就使用不了。

2.1 總結

1) 易用性:由於使用門檻高,從而限制了它的推廣。

2)StateBackend:更多的須要外部存儲,好比redis之類的kv存儲。

3) 資源分配方面:用worker和slot提早設定的方式,另外因爲優化點作的較少,引擎吞吐量相對比較低一點。

3. Sparkstreaming

有一天有個業務方過來提需求說 咱們能不能寫個sql,幾分鐘內就能夠發佈一個實時計算任務。 因而咱們開始作Sparkstreaming。它的主要概念以下:

1) Micro-batch:須要提早設定一個窗口,而後在窗口內處理數據。

2) 延遲是秒級級別,比較好的狀況是500ms左右。

3) 開發語言是java和scala。

4)streaming SQL,主要是咱們的工做,咱們但願提供streaming SQL的平臺。

特色:

1) Spark生態和SparkSQL: 這是Spark比較好的地方,技術棧是統一的,SQL,圖計算,machine learning的包都是能夠互調的。由於它先作的是批處理,和Flink不同,因此它自然的實時和離線的api是統一的。

2) Checkpointon hdfs。

3) onyarn:Spark是屬於hadoop生態體系,和yarn集成度高。

4) 高吞吐: 由於它是Micro-batch的方式,吞吐也是比較高的。

下面給你們大體展現一下咱們平臺用戶快速發佈一個實時任務的操做頁面,它須要哪些步驟。咱們這裏不是寫DDL和DML語句,而是ui展現頁面的方式。

clipboard.png

頁面裏面會讓用戶選一些必要的參數, 首先會選哪個kafka集羣,每一個分區消費多少,反壓也是默認開啓的。消費位置須要讓用戶每次去指定,有可能用戶下一次重寫實時任務的時候,能夠根據業務需求去選擇offset消費點。

中間就是讓用戶描述pipeline。 SQL就是kafka的多個topic,輸出選擇一個輸出表,SQL把上面消費的kafka DStream註冊成表,而後寫一串pipeline,最後咱們幫用戶封裝了一些對外sink(剛剛提到的各類存儲都支持,若是存儲能實現upsert語義的話,咱們都是支持了的)。

3.1 MultiStream-Join

雖然剛剛知足通常無狀態批次內的計算要求,但就有用戶想說, 我想作流的join怎麼辦, 早期的Spark1.5能夠參考Spark-streamingsql這個開源項目把 DStream註冊爲一個表,而後對這個表作join的操做,但這隻支持1.5以前的版本,Spark2.0推出structured streaming以後項目就廢棄了。咱們有一個tricky的方式:

clipboard.png

讓Sparkstreaming去消費多個topic,可是我根據一些條件把消費的DStream裏面的每一個批次RDD轉化爲DataFrame,這樣就能夠註冊爲一張表,根據特定的條件,切分爲兩張表,就能夠簡單的作個join,這個join的問題徹底依賴於本次消費的數據,它們join的條件是不可控的,是比較tricky的方式。好比說下面這個例子,消費兩個topic,而後簡單經過filer條件,拆成兩個表,而後就能夠作個兩張表的join,但它本質是一個流。

clipboard.png

3.2 Exactly-once

clipboard.png

exactly-once須要特別注意一個點:

咱們必需要求數據sink到外部存儲後,offset才能commit,不論是到zk,仍是mysql裏面,你最好保證它在一個transaction裏面,並且必須在輸出到外部存儲(這裏最好保證一個upsert語義,根據unique key來實現upset語義)以後,而後這邊源頭driver再根據存儲的offeset去產生kafka RDD,executor再根據kafka每一個分區的offset去消費數據。若是知足這些條件,就能夠實現端到端的exactly-once. 這是一個大前提。

3.3 總結

1) Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):咱們要實現跨批次帶狀態的計算的話,在1.X版本,咱們經過這兩個接口去作,但仍是須要把這個狀態存到hdfs或者外部去,實現起來比較麻煩一點。

2) Real Multi-Stream Join:沒辦法實現真正的多個流join的語義。

3)End-To-End Exactly-Once Semantics:它的端到端的exactly-once語義實現起來比較麻煩,須要sink到外部存儲後還須要手動的在事務裏面提交offset。

4. STRUCTUREDSTREAMING

咱們調研而後並去使用了Spark2.X以後帶狀態的增量計算。下面這個圖是官方網站的:

clipboard.png

全部的流計算都參照了Google的 data flow,裏面有個重要的概念:數據的processing time和event time,即數據的處理時間和真正的發生時間有個gap。因而流計算領域還有個watermark,當前進來的事件水位須要watermark來維持,watermark能夠指定時間delay的範圍,在延遲窗口以外的數據是能夠丟棄的,在業務上晚到的數據也是沒有意義的。

下面是structuredstreaming的架構圖:

clipboard.png

這裏面就是把剛纔Sparkstreaming講exactly-once的步驟1,2,3都實現了,它本質上仍是分批的batch方式,offset本身維護,狀態存儲用的hdfs,對外的sink沒有作相似的冪等操做,也沒有寫完以後再去commit offset,它只是再保證容錯的同時去實現內部引擎的exactly-once。

4.1 特色

1) Stateful Processing SQL&DSL:能夠知足帶狀態的流計算

2) Real Multi-Stream Join:能夠經過Spark2.3實現多個流的join,多個流的join作法和Flink相似,你須要先定義兩個流的條件(主要是時間做爲一個條件),好比說有兩個topic的流進來,而後你但願經過某一個具體的schema中某個字段(一般是event time)來限定須要buffer的數據,這樣能夠實現真正意義上的流的join。

3)比較容易實現端到端的exactly-once的語義,只須要擴展sink的接口支持冪等操做是能夠實現exactly-once的。

特別說一下,Structuredstreaming和原生的streaming的api有一點區別,它create表的Dataframe的時候,是須要指定表的schema的,意味着你須要提早指定schema。另外它的watermark是不支持SQL的,因而咱們加了一個擴展,實現徹底寫sql,能夠從左邊到右邊的轉換(下圖),咱們但願用戶不止是程序員,也但願不會寫程序的數據分析師等同窗也能用到。

clipboard.png

4.2 總結

1) Trigger(Processing Time、 Continuous ):2.3以前主要基於processing Time,每一個批次的數據處理完了立馬觸發下一批次的計算。2.3推出了record by record的持續處理的trigger。

2)Continuous Processing (Only Map-Like Operations):目前它只支持map like的操做,同時sql的支持度也有些限制。

3) LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保證須要本身作一些額外的擴展, 咱們發現kafka0.11版本提供了事務的功能,是能夠從基於這方面考慮從而去實現從source到引擎再到sink,真正意義上的端到端的exactly-once。

4) CEP(Drools):咱們發現有業務方須要提供cep 這樣復瑣事件處理的功能,目前咱們的語法沒法直接支持,咱們讓用戶使用規則引擎Drools,而後跑在每一個executor上面,依靠規則引擎功能去實現cep。

因而基於以上幾個Spark structuredstreaming的特色和缺點,咱們考慮使用Flink來作這些事情。

5.Flink

clipboard.png

Flink目標是對標Spark,流這塊是領先比較多,它野心也比較大,圖計算,機器學習等它都有,底層也是支持yarn,tez等。對於社區用的比較多的存儲,Flink社區官方都支持比較好,相對來講。

Flink的框架圖:

clipboard.png

Flink中的JobManager,至關於Spark的driver角色,taskManger至關於executor,裏面的task也有點相似Spark的那些task。 不過Flink用的rpc是akka,同時Flink core自定義了內存序列化框架,另外task無需像Spark每一個stage的task必須相互等待而是處理完後即往下游發送數據。

Flink binary data處理operator:

clipboard.png

Spark的序列化用戶通常會使用kryo或者java默認的序列化,同時也有Tungsten項目對Spark程序作一jvm層面以及代碼生成方面的優化。相對於Spark,Flink本身實現了基於內存的序列化框架,裏面維護着key和pointer的概念,它的key是連續存儲,在cpu層面會作一些優化,cache miss機率極低。比較和排序的時候不須要比較真正的數據,先經過這個key比較,只有當它相等的時候,纔會從內存中把這個數據反序列化出來,再去對比具體的數據,這是個不錯的性能優化點。

Flink task chain:

clipboard.png

Task中operatorchain,是比較好的概念。若是上下游數據分佈不須要從新shuffle的話,好比圖中source是kafka source,後面跟的map只是一個簡單的數據filter,咱們把它放在一個線程裏面,就能夠減小線程上下文切換的代價。

並行度概念

clipboard.png

好比說這裏面會有5個task,就會有幾個併發線程去跑,chain起來的話放在一個線程去跑就能夠提高數據傳輸性能。Spark是黑盒的,每一個operator沒法設併發度,而Flink能夠對每一個operator設併發度,這樣能夠更靈活一點,做業運行起來對資源利用率也更高一點。

Spark 通常經過Spark.default.parallelism來調整並行度,有shuffle操做的話,並行度通常是通Spark.sql.shuffle.partitions參數來調整,實時計算的話其實應該調小一點,好比咱們生產中和kafka的partition數調的差很少,batch在生產上會調得大一點,咱們設爲1000,左邊的圖咱們設併發度爲2,最大是10,這樣首先分2個併發去跑,另外根據key作一個分組的概念,最大分爲10組,就能夠作到把數據儘可能的打散。

State & Checkpoint

由於Flink的數據是一條條過來處理,因此Flink中的每條數據處理完了立馬發給下游,而不像spark,須要等該operator所在的stage全部的task都完成了再往下發。

Flink有粗粒度的checkpoint機制,以很是小的代價爲每一個元素賦予一個snapshot概念,只有當屬於本次snapshot的全部數據都進來後纔會觸發計算,計算完後,才把buffer數據往下發,目前Flink sql沒有提供控制buffer timeout的接口,即個人數據要buffer多久才往下發。能夠在構建Flink context時,指定buffer timeout爲0,處理完的數據纔會立馬發下去,不須要等達到必定閾值後再往下發。

Backend默認是維護在jobmanager內存,咱們更多使用的的是寫到hdfs上,每一個operator的狀態寫到rocksdb上,而後異步週期增量同步到外部存儲。

容錯

clipboard.png

圖中左半部分的紅色節點發生了failover,若是是at-least-once,則其最上游把數據重發一次就好;但若是是exactly-once,則須要每一個計算節點從上一次失敗的時機重放。

Exactly Once Two-Phase Commit

clipboard.png

Flink1.4以後有兩階段提交來支持exactly-once.它的概念是從上游kafka消費數據後,每一步都會發起一次投票,來記錄狀態,經過checkpoint的屏障來處理標記,只有最後再寫到kafka(0.11以後的版本),只有最後完成以後,纔會把每一步的狀態讓jobmanager中的cordinator去通知能夠固化下來,這樣實現exactly-once。

Savepoints
還有一點Flink比較好的就是,基於它的checkpoint來實現savepoint功能。業務方須要每一個應用恢復節點不同,但願恢復到的版本也是能夠指定的,這是比較好的。這個savepoint不僅是數據的恢復,也有計算狀態的恢復。

特色:

1) Trigger (Processing Time、 Event Time、IngestionTime):對比下,Flink支持的流式語義更豐富,不只支持Processing Time, 也支持Event time和Ingestion Time。

2)Continuous Processing & Window:支持純意義上的持續處理,recordby record的,window也比Spark處理的好。

3) Low End-To-End Latency With Exactly-Once Guarantees:由於有兩階段提交,用戶是能夠選擇在犧牲必定吞吐量的狀況下,根據業務需求狀況來調整來保證端到端的exactly-once。

4) CEP:支持得好。

5) Savepoints:能夠根據業務的需求作一些版本控制。

也有作的還很差的:

1)SQL (Syntax Function、Parallelism):SQL功能還不是很完備,大部分用戶是從hive遷移過來,Spark支持hive覆蓋率達到99%以上。 SQL函數不支持,目前還沒法對單個operator作並行度的設置。

2) ML、Graph等:機器學習,圖計算等其餘領域比Spark要弱一點,但社區也在着力持續改進這個問題。

本文做者:易偉平

閱讀原文

本文來自雲棲社區合做夥伴「阿里技術」,如需轉載請聯繫原做者。

相關文章
相關標籤/搜索