快手基於 Apache Flink 的優化實踐

 

本次由快手劉建剛老師分享,內容主要分爲三部分。首先介紹流式計算的基本概念, 而後介紹 Flink 的關鍵技術,最後講講 Flink 在快手生產實踐中的一些應用,包括實時指標計算和快速 failover。數據庫

1、流式計算的介紹

流式計算主要針對 unbounded data(***數據流)進行實時的計算,將計算結果快速的輸出或者修正。apache

這部分將分爲三個小節來介紹。第一,介紹大數據系統發展史,包括初始的批處理到如今比較成熟的流計算;第二,爲你們簡單對比下批處理和流處理的區別;第三,介紹流式計算裏面的關鍵問題,這是每一個優秀的流式計算引擎所必須面臨的問題。緩存

一、大數據系統發展史架構

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

上圖是 2003 年到 2018 年大數據系統的發展史,看看是怎麼一步步走到流式計算的。併發

2003 年,Google 的 MapReduce 橫空出世,經過經典的 Map&Reduce 定義和系統容錯等保障來方便處理各類大數據。很快就到了 Hadoop,被認爲是開源版的 MapReduce, 帶動了整個apache開源社區的繁榮。再日後是谷歌的 Flume,經過算子鏈接等 pipeline 的方式解決了多個 MapReduce 做業鏈接處理低效的問題。分佈式

流式系統的開始以 Storm 來介紹。Storm 在2011年出現, 具有延時短、性能高等特性, 在當時頗受喜好。可是 Storm 沒有提供系統級別的 failover 機制,沒法保障數據一致性。那時的流式計算引擎是不精確的,lamda 架構組裝了流處理的實時性和批處理的準確性,曾經風靡一時,後來由於難以維護也逐漸沒落。ide

接下來出現的是 Spark Streaming,能夠說是第一個生產級別的流式計算引擎。Spark Streaming 早期的實現基於成熟的批處理,經過 mini batch 來實現流計算,在 failover 時可以保障數據的一致性。函數

Google 在流式計算方面有不少探索,包括 MillWheel、Cloud Dataflow、Beam,提出了不少流式計算的理念,對其餘的流式計算引擎影響很大。oop

再來看 Kafka。Kafka 並不是流式計算引擎,可是對流式計算影響特別大。Kafka 基於log 機制、經過 partition 來保存實時數據,同時也能存儲很長時間的歷史數據。流式計算引擎能夠無縫地與kafka進行對接,一旦出現 Failover,能夠利用 Kafka 進行數據回溯,保證數據不丟失。另外,Kafka 對 table 和 stream 的探索特別多,對流式計算影響巨大。性能

Flink 的出現也比較久,一直到 2016 年左右才火起來的。Flink 借鑑了不少 Google 的流式計算概念,使得它在市場上特別具備競爭力。後面我會詳細介紹 Flink 的一些特色。

二、批處理與流計算的區別

批處理和流計算有什麼樣的區別,這是不少同窗有疑問的地方。咱們知道 MapReduce 是一個批處理引擎,Flink 是一個流處理引擎。咱們從四個方面來進行一下對比:

1)使用場景

MapReduce 是大批量文件處理,這些文件都是 bounded data,也就是說你知道這個文件何時會結束。相比而言,Flink 處理的是實時的 unbounded data,數據源源不斷,可能永遠都不會結束,這就給數據完備性和 failover 帶來了很大的挑戰。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

2)容錯

MapReduce 的容錯手段包括數據落盤、重複讀取、最終結果可見等。文件落盤能夠有效保存中間結果,一旦 task 掛掉重啓就能夠直接讀取磁盤數據,只有做業成功運行完了,最終結果纔對用戶可見。這種設計的哲理就是你能夠經過重複讀取同一份數據來產生一樣的結果,能夠很好的處理 failover。

Flink 的容錯主要經過按期快照和數據回溯。每隔一段時間,Flink就會插入一些 barrier,barrier 從 source 流動到 sink,經過 barrier 流動來控制快照的生成。快照製做完就能夠保存在共享引擎裏。一旦做業出現問題,就能夠從上次快照進行恢復,經過數據回溯來從新消費。

3)性能

MapReduce 主要特色是高吞吐、高延時。高吞吐說明處理的數據量很是大;高延時就是前面說到的容錯問題,它必須把整個做業處理完纔對用戶可見。

Flink 主要特色是高吞吐、低延時。在流式系統裏,Flink 的吞吐是很高的。同時,它也能夠作到實時處理和輸出,讓用戶快速看到結果。

4)計算過程

MapReduce 主要經過 Map 和 reduce 來計算。Map 負責讀取數據並做基本的處理, reduce 負責數據的聚合。用戶能夠根據這兩種基本算子,組合出各類各樣的計算邏輯。

Flink 爲用戶提供了 pipeline 的 API 和批流統一的 SQL。經過 pipeline 的 API, 用戶能夠方便地組合各類算子構建複雜的應用;Flink SQL 是一個更高層的 API 抽象,極大地下降了用戶的使用門檻。

三、流式計算的關鍵問題

這部分主要經過四個問題給你們解答流式計算的關鍵問題,也是不少計算引擎須要考慮的問題。

1)What

What 是指經過什麼樣的算子來進行計算。主要包含三個方面的類型,element-wise 表示一對一的計算,aggregating 表示聚合操做,composite 表示多對多的計算。

2)Where

aggregating 會進行一些聚合的計算, 主要是在各類 window 裏進行計算。窗口包含滑動窗口、滾動窗口、會話窗口。窗口會把***的數據切分紅有界的一個個數據塊進行處理,後面咱們會詳細介紹這點。

3)When

When 就是何時觸發計算。窗口裏面有數據,因爲輸入數據是無窮無盡的,很難知道一個窗口的數據是否所有到達了。流式計算主要經過 watermark 來保障數據的完備性,經過 trigger 來決定什麼時候觸發。當接收到數值爲 X 的 Watermark 時,能夠認爲全部時間戳小於等於X的事件所有到達了。一旦 watermark 跨過窗口結束時間,就能夠經過 trigger 來觸發計算並輸出結果。

4)How

How 主要指咱們如何從新定義同一窗口的屢次觸發結果。前面也說了 trigger 是用來觸發窗口的, 一個窗口可能會被觸發屢次,好比1分鐘的窗口每 10 秒觸發計算一次。處理方式主要包含三種:

  • Discarding,丟棄以前的狀態從新計算。這種方式每次的觸發結果都是互不關聯的,屢次觸發結果的組合反映了所有的窗口內容,下游通常會再次聚合;
  • Accumulating,這個就是一個聚合的狀態,好比說第二次觸發的時候是在第一次的結果上進行計算的,下游只須要保存最新的結果便可;
  • Accumulating 和 retracting,這個主要在 Accumulating 的基礎上加了一個 retracting,retracting 的意思就是撤銷。窗口再次觸發時,會告訴下游撤銷上一次的計算結果,並告知最新的結果。Flink SQL 的聚合就使用了這種 retract的模式。

2、Flink 關鍵技術

一、Flink 簡介

Flink 是一款分佈式計算引擎, 既能夠進行流式計算,也能夠進行批處理。下圖是官網對 Flink 的介紹:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

Flink 能夠運行在 k8s、yarn、mesos 等資源調度平臺上,依賴 hdfs 等文件系統,輸入包含事件和各類其餘數據,通過 Flink 引擎計算後再輸出到其餘中間件或者數據庫等。

Flink 有兩個核心概念:

  • State:Flink 能夠處理有狀態的數據,經過自身的 state 機制來保障做業failover時數據不丟失;
  • Event Time:容許用戶按照事件時間來處理數據,經過 watermark 來推進時間前進,這個後面還會詳細介紹。主要是系統的時間和事件的時間。

Flink 主要經過上面兩個核心技術來保證 exactly-once, 好比說做業 Failover 的時候狀態不丟失,就好像沒發生故障同樣。

二、快照機制

Flink 的快照機制主要是爲了保障做業 failover 時不丟失狀態。Flink 提供了一種輕量級的快照機制,不須要中止做業就能夠幫助用戶持久化內存中的狀態數據。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

上圖中的 markers(與 barrier 語義相同)經過流動來觸發快照的製做,每個編號都表明了一次快照,好比編號爲 n 的 markers 從最上游流動到最下游就表明了一次快照的製做過程。簡述以下:

  • 系統發送編號爲 n 的 markers 到最上游的算子,markers 隨着數據往下游流動;
  • 當下遊算子收到 marker 後,就開始將自身的狀態保存到共享存儲中;
  • 當全部最下游的算子接收到 marker 並完成算子快照後,本次做業的快照製做完成。

一旦做業失敗,重啓時就能夠從快照恢復。

下面爲一個簡單的 demo 說明(barrier 等同於 marker)。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

  • barrier 到達 Source,將狀態 offset=7 存儲到共享存儲;
  • barrier 到達 Task,將狀態 sum=21 存儲到共享存儲;
  • barrier 到達 Sink,commit 本次快照,標誌着快照的成功製做。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

這時候忽然間做業也掛掉, 重啓時 Flink 會經過快照恢復各個狀態。Source 會將自身的 offset 置爲 7,Task 會將自身的 sum 置爲 21。如今咱們能夠認爲 一、二、三、四、五、6 這 6 個數字的加和結果並無丟失。這個時候,offset 從 7 開始消費,跟做業失敗前徹底對接了起來,確保了 exactly-once。

三、事件時間

時間類型分爲兩種:

  • Event time(事件時間),指事件發生的時間,好比採集數據時的時間;
  • Processing time(系統時間),指系統的時間,好比處理數據時的時間。

若是你對數據的準確性要求比較高的話,採用 Event time 能保障 exactly-once。Processing Time 通常用於實時消費、精準性要求略低的場景,主要是由於時間生成不是 deterministic。

咱們能夠看下面的關係圖, X 軸是 Event time,Y 軸是 Processing time。理想狀況下 Event time 和 Processing time 是相同的,就是說只要有一個事件發生,就能夠馬上處理。可是實際場景中,事件發生後每每會通過必定延時纔會被處理,這樣就會致使咱們系統的時間每每會滯後於事件時間。這裏它們兩個的差 Processing-time lag 表示咱們處理事件的延時。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

事件時間經常使用在窗口中,使用 watermark 來確保數據完備性,好比說 watermarker 值大於 window 末尾時間時,咱們就能夠認爲 window 窗口全部數據都已經到達了,就能夠觸發計算了。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

好比上面 [0-10] 的窗口,如今 watermark 走到了 10,已經到達了窗口的結束,觸發計算 SUM=21。若是要是想對遲到的數據再進行觸發,能夠再定義一下後面 late data 的觸發,好比說後面來了個 9,咱們的 SUM 就等於 30。

四、窗口機制

窗口機制就是把***的數據分紅數據塊來進行計算,主要有三種窗口。

  • 滾動窗口:固定大小的窗口,相鄰窗口沒有交集;
  • 滑動窗口:每一個窗口的大小是同樣的,可是兩個窗口之間會有重合;
  • 會話窗口:根據活躍時間聚合而成的窗口, 好比活躍時間超過3分鐘新起一個窗口。窗口之間留有必定的間隔。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

窗口會自動管理狀態和觸發計算,Flink 提供了豐富的窗口函數來進行計算。主要包括如下兩種:

  • ProcessWindowFunction,全量計算會把全部數據緩存到狀態裏,一直到窗口結束時統一計算。相對來講,狀態會比較大,計算效率也會低一些;
  • AggregateFunction,增量計算就是來一條數據就算一條,可能咱們的狀態就會特別的小,計算效率也會比 ProcessWindowFunction 高不少,可是若是狀態存儲在磁盤頻繁訪問狀態可能會影響性能。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

3、快手 Flink 實踐

一、應用歸納

快手應用歸納主要是分爲數據接入、Flink 實時計算、數據應用、數據展現四個部分。各層各司其職、銜接流暢,爲用戶提供一體化的數據服務流程。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

二、實時指標計算

常見的實時指標計算包括 uv、pv 和 sum。這其中 uv 的計算最爲複雜也最爲經典。下面我將重點介紹 uv。

uv 指的是不一樣用戶的個數,咱們這邊計算的就是不一樣 deviceld 的個數,主要的挑戰來自三方面:

  • 用戶數多,數據量大。活動期間的 QPS 常常在千萬級別,實際計算起來特別複雜;
  • 實時性要求高,一般爲幾秒到分鐘結果的輸出;
  • 穩定性要求高,好比說咱們在作春晚活動時候要求故障時間須要低於2%或更少。

針對各類各樣的 uv 計算,咱們提供了一套成熟的計算流程。主要包含了三方面:

  • 字典方案:將 string 類型的 deviceld 轉成 long 類型,方便後續的 uv 計算;
  • 傾斜處理:好比某些大 V 會致使數據嚴重傾斜,這時候就須要打散處理;
  • 增量計算:好比計算 1 天的 uv,每分鐘輸出一次結果。

字典方案須要確保任何兩個不一樣的 deviceId 不能映射到相同的 long 類型數字上。快手內部主要使用過如下三種方案:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

  • HBase, 基於 partition 分區創建 deviceld 到 id 的映射, 經過緩存和批量訪問來加速;
  • Redis, 這種方案嚴格來講不屬於字典,主要經過 key-value 來判斷數據是否首次出現,基於首次數據來計算 uv,這樣就會把 pv 和 uv 的計算進行統一;
  • 最後就是一個 Flink 內部自建的全局字典實現 deviceld 到 id 的轉換,以後計算UV。

這三種方案裏面,前兩種屬於外部存儲的字典方案,優勢是能夠作到多個做業共享 1 份數據, 缺點是外部訪問慢並且不太穩定。最後一種 Flink 字典方案基於 state,不依賴外部存儲, 性能高可是沒法多做業共享。

接下來咱們重點介紹基於Flink自身的字典方案,下圖主要是創建一個 deviceld 到 id 的映射:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

主要分紅三步走:

1)創建 Partition 分區, 指定一個比較大的 Partition 分區個數,該個數比較大而且不會變,根據 deviceld 的哈希值將其映射到指定 partition。

2)創建 id 映射。每一個 Partition 都有本身負責的 id 區間,確保 Partition 之間的long 類型的 id 不重複, partition 內部經過自增 id 來確保每一個 deviceId 對應一個 id。

3)使用 keyed state 保存 id 映射。這樣咱們的做業出現併發的大改變時,能夠方便的 rescale,不須要作其餘的操做。

除了 id 轉換,後面就是一個實時指標計算的常見問題,就是數據傾斜。業界常見的解決數據傾斜處理方案主要是兩種:

  • 打散再聚合:先將傾斜的數據打散計算,而後再聚合計算結果;
  • Local-aggregate:先在本地計算預聚合,這樣會大大減小下游的數據壓力。

兩者的本質是同樣的,都是先預聚合再彙總,從而避免單點性能問題。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

上圖爲計算最小值的熱點問題,紅色數據爲熱點數據。若是直接將它們打到同一個分區,會出現性能問題。爲了解決傾斜問題,咱們經過hash策略將數據分紅小的 partition 來計算,如上圖的預計算,最後再將中間結果彙總計算。

當一切就緒後,咱們來作增量的 UV 計算,好比計算 1 天 uv,每分鐘輸出 1 次結果。計算方式既能夠採用 API,也能夠採用 SQL。

針對 API,咱們選擇了 global state+bitmap 的組合,既嚴格遵循了 Event Time 又減小了 state 大小:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

下面爲計算流程(須要注意時區問題):

  • 定義跟觸發間隔同樣大小的 window(好比 1 分鐘);
  • Global state 用來保存跨窗口的狀態,咱們採用 bitmap 來存儲狀態;
  • 每隔一個 window 觸發一次,輸出起始至今的 UV;
  • 當前做用域(好比 1 天)結束,清空狀態從新開始。

針對 SQL,增量計算支持的還不是那麼完善,可是能夠利用 early-fire 的參數來提早觸發窗口。

配置以下:

table.exec.emit.early-fire.enabled:
truetable.exec.emit.early-fire.delay:60 s

early-fire.delay 就是每分鐘輸出一次結果的意思。

SQL 以下:

SELECT TUMBLE_ROWTIME(eventTime, interval ‘1’ day) AS rowtime, dimension, count(distinct id) as uv
FROM person
GROUP BY TUMBLE(eventTime, interval '1' day), dimension

若是遇到傾斜,能夠參考上一步來處理。

三、快速 failover

最後看下咱們部門最近發力的一個方向,如何快速 failover。

Flink 做業都是 long-running 的在線做業,不少對可用性的要求特別高,尤爲是跟公司核心業務相關的做業,SLA 要求 4 個 9 甚至更高。看成業遇到故障時,如何快速恢復對咱們來講是一個巨大的挑戰。

下面分三個方面來展開:

  • Flink 當前已有的快速恢復方案;
  • 基於 container 宕掉的快速恢復;
  • 基於機器宕掉的快速恢復。

1)Flink 當前已有的快速恢復方案

Flink 當前已有的快速恢復方案主要包括如下兩種:

  • region failover。若是流式做業的 DAG 包含多個子圖或者 pipeline,那麼 task 失敗時只會影響其所屬的子圖或者 pipeline ,而不用整個 DAG 都從新啓動;
  • local recovery。在 Flink 將快照同步到共享存儲的同時,在本地磁盤也保存一份快照。做業失敗恢復時,能夠調度到上次部署的位置,並從 local disk 進行快照恢復。

2)基於 container 宕掉的快速恢復

實際環境中, container 宕掉再申請有時會長達幾十秒,好比由於 hdfs 慢、yarn 慢等緣由,嚴重影響恢復速度。爲此,咱們作了以下優化:

  • 冗餘資源。維持固定個數的冗餘 container,一旦 container 宕掉,冗餘 container 馬上候補上來,省去了繁雜的資源申請流程;
  • 提早申請。一旦發現做業由於 container 宕掉而失敗,馬上申請新的 container 。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

以上優化覆蓋了很大一部分場景,恢復時間從 30s-60s 降到 20s 之內。

3)基於機器宕掉的快速恢復

機器宕掉時,flink on yarn 的恢復時間超過 3 分鐘,這對重要做業顯然是沒法容忍的!爲了作到快速恢復,咱們須要作到快速感知和恢復:

  • 冗餘資源並打散分配,確保兩個冗餘資源不在一個 container,redundantContainerNum=max(containerNumOfHost) + 1;
  • 做業宕機,Hawk 監測系統 5 秒內發現;
  • 冗餘資源快速候補,免去申請資源的流程。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

經過這種方案,咱們能夠容忍任意一臺機器的宕機,並將宕機恢復時間由原先的 3 分鐘下降到 30 秒之內。

4、總結

本文從大數據系統的發展入手,進而延伸出流式系統的關鍵概念,以後介紹了 Flink的關鍵特性,最後講解了快手內部的實時指標計算和快速 failover,但願對你們有所幫助。

5、Q&A

Q1:打算作實時計算,能夠跳過 Storm、Spark 直接上手 Flink 嗎?

A:能夠直接使用 Flink。Storm 在 failover 時會丟失數據,沒法作到 exactly-once;spark streaming 是 Flink 的競爭者,是在批處理的基礎上實現流計算,相比而言,Flink 的底層是流處理,更加適合流計算。

Q2:通常怎麼處理 taskmanager heartbeat timeout?

A:默認 10 秒彙報一次心跳,心跳超時爲 50 秒,這個時候做業會失敗,若是配置了高可用那麼會重啓。

Q3:如何保證 2 天大時間跨度延遲消息的窗口計算?

A:這裏主要的挑戰在於時間長、狀態大,建議 stateBakend 使用 Rocksdb(能夠利用磁盤存儲大狀態),窗口計算建議使用增量計算來減小狀態的大小。

Q4:Flink on Yarn,Yarn 重啓會自動拉起 Flink 任務嗎,說不能拉起怎麼處理,手動啓動嗎?

A:若是配置了高可用(依賴 zookeeper),做業失敗了就能夠自動拉起。

Q5:Kafka 目前多用做數據中轉平臺,Flink 至關於替代了 Kafka Stream 嗎?

A:Kafka的核心功能是消息中間件,kafka stream 能夠跟 kafka 很好的集成,但並非一個專業的計算引擎。相比而言,flink 是一個分佈式的流式計算引擎,功能上更增強大。

Q6:大家怎麼看待 Apache Beam?

A:Apache Beam 在上層進行了抽象,能夠類比 SQL,只定義規範,底層能夠接入各類計算引擎。

相關文章
相關標籤/搜索