本文做者:易偉平(餓了麼)java
整理:姬平(阿里巴巴實時計算部)python
本文將爲你們展現餓了麼大數據平臺在實時計算方面所作的工做,以及計算引擎的演變之路,你能夠藉此瞭解Storm、Spark、Flink的優缺點。如何選擇一個合適的實時計算引擎?Flink憑藉何種優點成爲餓了麼首選?本文將帶你一一解開謎題。
下面是目前餓了麼平臺現狀架構圖:mysql
來源於多個數據源的數據寫到 kafka 裏,計算引擎主要是 Storm , Spark 和 Flink,計算引擎出來的結果數據再落地到各類存儲上。程序員
目前 Storm 任務大概有100多個,Spark任務有50個左右,Flink暫時還比較少。redis
目前咱們集羣規模天天數據量有60TB,計算次數有1000000000,節點有400個。這裏要提一下,Spark 和 Flink都是 on yarn 的,其中Flink onyarn主要是用做任務間 jobmanager 隔離, Storm 是 standalone 模式。sql
在講述咱們應用場景以前,先強調實時計算一個重要概念, 一致性語義:apache
1) at-most-once:即 fire and forget,咱們一般寫一個 java 的應用,不去考慮源頭的 offset 管理,也不去考慮下游的冪等性的話,就是簡單的at-most-once,數據來了,無論中間狀態怎樣,寫數據的狀態怎樣,也沒有ack機制。api
2) at-least-once: 重發機制,重發數據保證每條數據至少處理一次。性能優化
3) exactly-once: 使用粗 checkpoint 粒度控制來實現 exactly-once,咱們講的 exactly-once 大多數指計算引擎內的 exactly-once,即每一步的 operator 內部的狀態是否能夠重放;上一次的 job 若是掛了,可否從上一次的狀態順利恢復,沒有涉及到輸出到 sink 的冪等性概念。架構
4) at-least-one + idempotent = exactly-one:若是咱們能保證說下游有冪等性的操做,好比基於mysql實現 update on duplicate key;或者你用es, cassandra之類的話,能夠經過主鍵key去實現upset的語義, 保證at-least-once的同時,再加上冪等性就是exactly-once。
餓了麼早期都是使用Storm,16年以前仍是Storm,17年纔開始有Sparkstreaming, Structed-streaming。Storm用的比較早,主要有下面幾個概念:
1) 數據是 tuple-based
2) 毫秒級延遲
3) 主要支持java, 如今利用apache beam也支持python和go。
4) Sql的功能還不完備,咱們本身內部封裝了typhon,用戶只須要擴展咱們的一些接口,就可使用不少主要的功能;flux是Storm的一個比較好的工具,只須要寫一個yaml文件,就能夠描述一個Storm任務,某種程度上說知足了一些需求,但仍是要求用戶是會寫java的工程師,數據分析師就使用不了。
1) 易用性:由於使用門檻高,從而限制了它的推廣。
2)StateBackend:更多的須要外部存儲,好比redis之類的kv存儲。
3) 資源分配方面:用worker和slot提早設定的方式,另外因爲優化點作的較少,引擎吞吐量相對比較低一點。
有一天有個業務方過來提需求說 咱們能不能寫個sql,幾分鐘內就能夠發佈一個實時計算任務。 因而咱們開始作Sparkstreaming。它的主要概念以下:
1) Micro-batch:須要提早設定一個窗口,而後在窗口內處理數據。
2) 延遲是秒級級別,比較好的狀況是500ms左右。
3) 開發語言是java和scala。
4) Streaming SQL,主要是咱們的工做,咱們但願提供 Streaming SQL 的平臺。
特色:
1) Spark生態和 SparkSQL: 這是Spark比較好的地方,技術棧是統一的,SQL,圖計算,machine learning的包都是能夠互調的。由於它先作的是批處理,和Flink不同,因此它自然的實時和離線的api是統一的。
2) Checkpointon hdfs。
3) On Yarn:Spark是屬於 hadoop 生態體系,和 yarn 集成度高。
4) 高吞吐: 由於它是 micro-batch 的方式,吞吐也是比較高的。
下面給你們大體展現一下咱們平臺用戶快速發佈一個實時任務的操做頁面,它須要哪些步驟。咱們這裏不是寫 DDL 和 DML 語句,而是 UI 展現頁面的方式。
頁面裏面會讓用戶選一些必要的參數, 首先會選哪個 kafka 集羣,每一個分區消費多少,反壓也是默認開啓的。消費位置須要讓用戶每次去指定,有可能用戶下一次重寫實時任務的時候,能夠根據業務需求去選擇offset消費點。
中間就是讓用戶描述 pipeline。 SQL 就是 kafka 的多個 topic,輸出選擇一個輸出表,SQL 把上面消費的 kafka DStream 註冊成表,而後寫一串 pipeline,最後咱們幫用戶封裝了一些對外 sink (剛剛提到的各類存儲都支持,若是存儲能實現 upsert 語義的話,咱們都是支持了的)。
雖然剛剛知足通常無狀態批次內的計算要求,但就有用戶想說, 我想作流的 join 怎麼辦, 早期的 Spark1.5 能夠參考 Spark-streamingsql 這個開源項目把 DStream 註冊爲一個表,而後對這個表作 join 的操做,但這隻支持1.5以前的版本,Spark2.0 推出 structured streaming 以後項目就廢棄了。咱們有一個 tricky 的方式:
讓 Sparkstreaming 去消費多個 topic,可是我根據一些條件把消費的 DStream 裏面的每一個批次 RDD 轉化爲DataFrame,這樣就能夠註冊爲一張表,根據特定的條件,切分爲兩張表,就能夠簡單的作個 join,這個 join 的問題徹底依賴於本次消費的數據,它們 join 的條件是不可控的,是比較 tricky 的方式。好比說下面這個例子,消費兩個 topic,而後簡單經過 filer 條件,拆成兩個表,而後就能夠作個兩張表的 join,但它本質是一個流。
exactly-once 須要特別注意一個點:
咱們必需要求數據 sink 到外部存儲後,offset 才能 commit,不論是到 zookeeper,仍是 mysql 裏面,你最好保證它在一個 transaction 裏面,並且必須在輸出到外部存儲(這裏最好保證一個 upsert 語義,根據 unique key 來實現upset語義)以後,而後這邊源頭driver再根據存儲的 offeset 去產生 kafka RDD,executor 再根據 kafka 每一個分區的 offset 去消費數據。若是知足這些條件,就能夠實現端到端的 exactly-once 這是一個大前提。
1) Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):咱們要實現跨批次帶狀態的計算的話,在1.X版本,咱們經過這兩個接口去作,但仍是須要把這個狀態存到 hdfs 或者外部去,實現起來比較麻煩一點。
2) Real Multi-Stream Join:沒辦法實現真正的多個流join的語義。
3) End-To-End Exactly-Once Semantics:它的端到端的 exactly-once 語義實現起來比較麻煩,須要sink到外部存儲後還須要手動的在事務裏面提交offset。
咱們調研而後並去使用了 Spark2.X 以後帶狀態的增量計算。下面這個圖是官方網站的:
全部的流計算都參照了 Google 的 data flow,裏面有個重要的概念:數據的 processing time 和 event time,即數據的處理時間和真正的發生時間有個 gap 。因而流計算領域還有個 watermark,當前進來的事件水位須要watermark 來維持,watermark 能夠指定時間 delay 的範圍,在延遲窗口以外的數據是能夠丟棄的,在業務上晚到的數據也是沒有意義的。
下面是 structured streaming 的架構圖:
這裏面就是把剛纔 sparkstreaming 講 exactly-once 的步驟1,2,3都實現了,它本質上仍是分批的 batch 方式,offset 本身維護,狀態存儲用的 hdfs,對外的 sink 沒有作相似的冪等操做,也沒有寫完以後再去 commit offset,它只是再保證容錯的同時去實現內部引擎的 exactly-once。
1) Stateful Processing SQL&DSL:能夠知足帶狀態的流計算
2) Real Multi-Stream Join:能夠經過 Spark2.3 實現多個流的 join,多個流的 join 作法和 Flink 相似,你須要先定義兩個流的條件(主要是時間做爲一個條件),好比說有兩個topic的流進來,而後你但願經過某一個具體的 schema 中某個字段(一般是 event time)來限定須要 buffer 的數據,這樣能夠實現真正意義上的流的 join。
3)比較容易實現端到端的 exactly-once 的語義,只須要擴展sink的接口支持冪等操做是能夠實現 exactly-once的。
特別說一下,structured streaming 和原生的 streaming 的 API 有一點區別,它建立表的 Dataframe 的時候,是須要指定表的 schema 的,意味着你須要提早指定 schema。另外它的 watermark 是不支持 SQL 的,因而咱們加了一個擴展,實現徹底寫 SQL,能夠從左邊到右邊的轉換(下圖),咱們但願用戶不止是程序員,也但願不會寫程序的數據分析師等同窗也能用到。
1) Trigger(Processing Time、 Continuous ):2.3以前主要基於processing Time,每一個批次的數據處理完了立馬觸發下一批次的計算。2.3推出了record by record的持續處理的trigger。
2) Continuous Processing (Only Map-Like Operations):目前它只支持map like的操做,同時sql的支持度也有些限制。
3) LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保證須要本身作一些額外的擴展, 咱們發現kafka0.11版本提供了事務的功能,是能夠從基於這方面考慮從而去實現從source到引擎再到sink,真正意義上的端到端的exactly-once。
4) CEP(Drools):咱們發現有業務方須要提供 CEP 這樣復瑣事件處理的功能,目前咱們的語法沒法直接支持,咱們讓用戶使用規則引擎 Drools,而後跑在每一個 executor 上面,依靠規則引擎功能去實現 CEP。
因而基於以上幾個 Spark Structured Streaming 的特色和缺點,咱們考慮使用 Flink 來作這些事情。
Flink 目標是對標 Spark,流這塊是領先比較多,它野心也比較大,圖計算,機器學習等它都有,底層也是支持yarn,tez等。對於社區用的比較多的存儲,Flink 社區官方都支持比較好,相對來講。
Flink 的框架圖:
Flink中的 JobManager,至關於 Spark 的 Driver 角色,TaskManger 至關於 Executor,裏面的 Task 也有點相似Spark 的那些 Task。 不過 Flink 用的 RPC 是 akka,同時 Flink Core 自定義了內存序列化框架,另外 Task 無需像Spark 每一個 Stage 的 Task 必須相互等待而是處理完後即往下游發送數據。
Flink binary data處理operator:
Spark 的序列化用戶通常會使用 kryo 或者 java 默認的序列化,同時也有 Tungsten 項目對 Spark 程序作一 JVM 層面以及代碼生成方面的優化。相對於 Spark,Flink本身實現了基於內存的序列化框架,裏面維護着key和pointer 的概念,它的 key 是連續存儲,在 CPU 層面會作一些優化,cache miss 機率極低。比較和排序的時候不須要比較真正的數據,先經過這個 key 比較,只有當它相等的時候,纔會從內存中把這個數據反序列化出來,再去對比具體的數據,這是個不錯的性能優化點。
Flink Task Chain:
Task中 operator chain,是比較好的概念。若是上下游數據分佈不須要從新 shuffle 的話,好比圖中 source 是kafka source,後面跟的 map 只是一個簡單的數據 filter,咱們把它放在一個線程裏面,就能夠減小線程上下文切換的代價。
並行度概念
好比說這裏面會有 5 個 Task,就會有幾個併發線程去跑,chain 起來的話放在一個線程去跑就能夠提高數據傳輸性能。Spark 是黑盒的,每一個 operator 沒法設併發度,而 Flink 能夠對每一個 operator 設併發度,這樣能夠更靈活一點,做業運行起來對資源利用率也更高一點。
Spark 通常經過 Spark.default.parallelism 來調整並行度,有 shuffle 操做的話,並行度通常是通Spark.sql.shuffle.partitions 參數來調整,實時計算的話其實應該調小一點,好比咱們生產中和 kafka 的 partition 數調的差很少,batch 在生產上會調得大一點,咱們設爲1000,左邊的圖咱們設併發度爲2,最大是10,這樣首先分2個併發去跑,另外根據 key 作一個分組的概念,最大分爲10組,就能夠作到把數據儘可能的打散。
State & Checkpoint
由於 Flink 的數據是一條條過來處理,因此 Flink 中的每條數據處理完了立馬發給下游,而不像 spark,須要等該operator 所在的 stage 全部的 task 都完成了再往下發。
Flink 有粗粒度的 checkpoint 機制,以很是小的代價爲每一個元素賦予一個 snapshot 概念,只有當屬於本次snapshot 的全部數據都進來後纔會觸發計算,計算完後,才把 buffer 數據往下發,目前 Flink sql 沒有提供控制buffer timeout 的接口,即個人數據要buffer多久才往下發。能夠在構建 Flink context 時,指定 buffer timeout爲 0,處理完的數據纔會立馬發下去,不須要等達到必定閾值後再往下發。
Backend 默認是維護在 jobmanager 內存,咱們更多使用的的是寫到 hdfs 上,每一個 operator 的狀態寫到 rocksdb 上,而後異步週期增量同步到外部存儲。
容錯
圖中左半部分的紅色節點發生了 failover,若是是 at-least-once,則其最上游把數據重發一次就好;但若是是exactly-once,則須要每一個計算節點從上一次失敗的時機重放。
Exactly Once Two-Phase Commit
Flink1.4 以後有兩階段提交來支持 exactly-once 。它的概念是從上游 kafka 消費數據後,每一步都會發起一次投票,來記錄狀態,經過checkpoint的屏障來處理標記,只有最後再寫到kafka(0.11以後的版本),只有最後完成以後,纔會把每一步的狀態讓 jobmanager 中的 cordinator 去通知能夠固化下來,這樣實現 exactly-once。
Savepoints
還有一點 Flink 比較好的就是,基於它的 checkpoint 來實現 savepoint 功能。業務方須要每一個應用恢復節點不同,但願恢復到的版本也是能夠指定的,這是比較好的。這個 savepoint 不僅是數據的恢復,也有計算狀態的恢復。
特色:
1) Trigger (Processing Time、 Event Time、IngestionTime):對比下,Flink支持的流式語義更豐富,不只支持 Processing Time, 也支持 Event time 和 Ingestion Time。
2)Continuous Processing & Window:支持純意義上的持續處理,record by record的,window 也比 Spark處理的好。
3) Low End-To-End Latency With Exactly-Once Guarantees:由於有兩階段提交,用戶是能夠選擇在犧牲必定吞吐量的狀況下,根據業務需求狀況來調整來保證端到端的exactly-once。
4) CEP:支持得好。
5) Savepoints:能夠根據業務的需求作一些版本控制。
也有作的還很差的:
1)SQL (Syntax Function、Parallelism):SQL功能還不是很完備,大部分用戶是從hive遷移過來,Spark支持hive覆蓋率達到99%以上。 SQL函數不支持,目前還沒法對單個operator作並行度的設置。
2) ML、Graph等:機器學習,圖計算等其餘領域比Spark要弱一點,但社區也在着力持續改進這個問題。
由於如今餓了麼已經屬於阿里的一員,後續會更多地使用 Flink,也期待用到 Blink。
更多資訊請訪問 Apache Flink 中文社區網站