簡介: 5 月 22 日北京站 Flink Meetup 分享的議題。前端
本文整理自愛奇藝技術經理韓紅根在 5 月 22 日北京站 Flink Meetup 分享的議題《Flink 在愛奇藝廣告業務的實踐》,內容包括:算法
- 業務場景
- 業務實踐
- Flink 使用過程當中的問題及解決
- 將來規劃
實時數據在廣告業務的使用場景主要能夠分爲四個方面:數據庫
業務實踐主要分爲兩類,第一個是實時數倉,第二個是特徵工程。markdown
實時數倉的目標包括數據完整性、服務穩定性和查詢能力。架構
上圖爲廣告數據平臺基礎架構圖,從下往上看:併發
中間是數據生產的部分,數據生產的底層是大數據的基礎設施,這部分由公司的一個雲平臺團隊提供,其中包含 Spark / Flink 計算引擎,Babel 統一的管理平臺。Talos 是實時數倉服務,RAP 和 OLAP 對應不一樣的實時分析以及 OLAP 存儲和查詢服務。工具
數據生產的中間層是廣告團隊包含的一些服務,例如在生產裏比較典型的離線計算和實時計算。性能
再上層是統一查詢服務,咱們會封裝不少接口進行查詢。測試
數據生產鏈路是從時間粒度來說的,咱們最開始是離線數倉鏈路,在最底層的這一行,隨着實時化需求推動,就產生了一個實時鏈路,整理來講,是一個典型的 Lambda 架構。大數據
另外,咱們的一些核心指標,好比計費指標,由於它的穩定性對下游比較關鍵,因此咱們這邊採用異路多活。異路多活是源端日誌產生以後,在計算層和下游存儲層作了徹底的冗餘,在後面的查詢裏作統一處理。
上文介紹了咱們要求提供出去的實時數據的指標是穩定不變的,進度服務實現的核心點包括時間窗口裏指標的變化趨勢,同時結合了實時計算任務自己的狀態,由於在實時數倉裏,不少指標是基於時間窗口作聚合計算。
好比一個實時指標,咱們輸出的指標是 3 分鐘,也就是說 4:00 這個時間點的指標的就包括了 4:00~4:03 的數據,4:03 包括了 4:03~4:06 的數據,其實就是指一個時間窗口的數據,何時是對外可見的。由於在實時計算裏,數據不斷進來, 4:00 的時間窗口的數據從 4:00 開始,指標就已經開始產生了。隨着時間疊加,指標不斷上升,最後趨於穩定。咱們基於時間窗口指標的變化率,來判斷它是否趨於穩定。
但若是隻是基於這個點來看,那麼它還存在必定的弊端。
由於這個結果表的計算鏈會依賴不少個計算任務,若是這個鏈路上面哪一個任務出現問題,可能會致使當前的指標雖然走勢已經趨於正常,可是最終並不完整。因此在這基礎之上,咱們又引入了實時計算任務狀態,在指標趨於穩定的時候,同時去看生產鏈路上這些計算任務是否正常,若是是正常的話,表示任務自己時間點的指標已經穩定,能夠對外提供服務。
若是計算有卡頓、堆積,或者已經有異常在重啓過程當中,就須要繼續等待迭代處理。
上圖爲查詢服務架構圖。
最下方是數據,裏面有實時存儲引擎,包括 Druid 等。在離線中,數據在 Hive 裏邊,可是在作查詢的時候,會把它們進行 OLAP 的同步,在這邊使用的是兩種引擎。爲了和 Kudu 作 union 查詢,會把它同步到 OLAP 引擎,而後上面去統一使用 Impala 作查詢。另外,對於使用場景裏比較固定的方式,能夠導到 Kylin 裏,而後在上面作數據分析。
基於這些數據,會有多個查詢節點,再上面是一個智能路由層。從最上面查詢網關,當有一個查詢請求進來,首先判斷它是否是一個複雜場景。好比在一個查詢裏,若是它的時長同時跨越了離線和實時,這裏就會同時使用到離線表和實時表。
另外,離線表裏還有更復雜的選表邏輯,好比小時級別,天級別。通過複雜場景分析以後,就會把最終選擇的表大概肯定下來。其實在作智能路由的時候,纔會去參考左邊的一些基礎服務,好比元數據管理,當前這些表的進度到哪一個點了。
對於查詢性能的優化,在數據裏,底層掃描的數據量對最終性能的影響是很是大的。因此會有一個報表降維,根據歷史的查詢去作分析。好比在一個降維表包含哪些維度,能夠覆蓋到百分之多少的查詢。
以前在實時數據報表生產裏提到,它主要是基於 API 的方式實現的。Lambda 架構自己有一個問題就是實時跟離線是兩個計算團隊,對於同一個需求,須要兩個團隊同時去開發,這樣會帶來幾個問題。
所以咱們的訴求是流批一體,思考在計算層是否可使用一個邏輯來表示同一個業務需求,好比能夠同時使用流或者批的計算引擎來達到計算的效果。
在這個鏈路裏邊,原始數據經過 Kafka 的方式接入進來,通過統一的 ETL 邏輯,接着把數據放在數據湖裏。由於數據湖自己能夠同時支持流和批的方式進行讀寫,並且數據湖自己能夠實時消費,因此它既能夠作實時計算,也能夠作離線計算,而後統一把數據再寫回數據湖。
前文提到在作查詢的時候,會使用離線跟實時作統一整合,因此在數據湖裏寫同一個表,在存儲層面能夠省去不少工做,另外也能夠節省存儲空間。
SQL 化是 Talos 實時數倉平臺提供的能力。
從頁面上來看,它包括了幾個功能,左邊是項目管理,右邊包括 Source、Transform 和 Sink。
例如,能夠拖一個 Kafka 的數據源進來,在上面作數據過濾,而後就能夠拖一個 Filter 算子達到過濾邏輯,後面能夠再去作一些 Project,Union 的計算,最後輸出到某個地方就能夠了。
對於能力稍微高一些的同窗,能夠去作一些更高層面的計算。這裏也能夠實現到實時數倉的目的,在裏面建立一些數據源,而後經過 SQL 的方式,把邏輯表示出來,最終把這個數據輸出到某種存儲。
上面是從開發層面來說,在系統層面上,它其實還提供了一些其餘的功能,好比規則校驗,還有開發/測試/上線,在這裏能夠統一管理。此外還有監控,對線上跑的實時任務有不少實時指標,能夠經過查看這些指標來判斷當前的任務是否是正常的狀態。
特徵工程有兩方面的需求:
實時化的另外一個重點是準確性,以前不少特徵工程是離線的,在生產環節裏面存在計算時的數據跟投放過程當中的特徵有誤差,基礎特徵數據不是很準確,所以咱們要求數據要更實時、更準確。
特徵工程的第二個需求是服務穩定性。
下面是在特徵實時化裏的實踐,首先是點擊率預估的需求。
點擊率預估案例的背景如上所示,從投放鏈路上來講,在廣告前端用戶產生觀影行爲,前端會向廣告引擎請求廣告,而後廣告引擎在作廣告召回粗排/精排的時候會拿到用戶特徵和廣告特徵。把廣告返回給前端以後,後續用戶行爲可能產生曝光、點擊等行爲事件,在作點擊率預估的時候,須要把前面請求階段的特徵跟後續用戶行爲流裏的曝光和點擊關聯起來,造成一個 Session 數據,這就是咱們的數據需求。
落實到具體實踐的話包括兩方面:
在實踐過程當中有哪些挑戰?
在時序上來講,特徵確定是早於 Tracking,可是兩個流成功關聯率在 99% 以上的時候,這個特徵須要保留多久?由於在廣告業務中,用戶能夠離線下載一個內容,在下載的時候就已經完成了廣告請求和返回了。可是後續若是用戶在沒有網的狀況下觀看,這個事件並不會立馬返回,只有當狀態恢復的時候,纔會有後續曝光和點擊事件回傳。
因此這個時候,其實特徵流和 Tracking 的時間歸納是很是長的。咱們通過離線的數據分析,若是兩個流的關聯率達 99% 以上,那麼特徵數據就須要保留比較長的時間,目前是保留 7 天,這個量級仍是比較大的。
上圖爲點擊率預測的總體架構,剛纔咱們提到關聯包括兩部分:
可是由於兩個流的時序自己多是錯開的,就是說,當曝光、點擊出現的時候,可能這個特徵尚未到,那麼就拿不到這個特徵。因此咱們作了一個多級重試隊列,保證最終兩個流關聯的完整性。
上圖右邊是更細的講解,闡述了流內事件關聯爲何選擇 CEP 方案。業務需求是把用戶行爲流裏屬於同一次廣告請求,而且是同一個廣告的曝光跟點擊關聯起來。曝光以後,好比 5 分鐘以內產生點擊,做爲一個正樣本,5 分鐘以後出現的點擊則拋棄不要了。
能夠想象一下,當遇到這樣的場景,經過什麼樣的方案能夠實現這樣的效果。其實在一個流裏多個事件的處理,能夠用窗口來實現。但窗口的問題是:
因此當時通過不少技術調研後,發現 Flink 裏的 CEP 能夠實現這樣的效果,用相似政策匹配的方式,描述這些序列須要知足哪些匹配方式。另外它能夠指定一個時間窗口,好比曝光和點擊間隔 15 分鐘。
上圖左邊是匹配規則的描述,begin 裏定義一個曝光,實現曝光以後 5 分鐘以內的點擊,後面是描述一個能夠出現屢次的點擊,within 表示關聯窗口是多長時間。
在生產實踐過程當中,這個方案大部分狀況下能夠關聯上,可是在作數據對比的時候,才發現存在某些曝光點擊沒有正常關聯到。
通過數據分析,發現這些數據自己的特色是曝光跟點擊的時間戳都是毫秒級別,當它們有相同毫秒時間戳的時候,這個事件就不能正常匹配。因而咱們採用一個方案,人爲地對於點擊事件加一毫秒,進行人工錯位,這樣就保證曝光跟點擊可以成功關聯上。
前文提到特徵數據須要保留 7 天,因此狀態是上百 TB。須要把數據放在一個外部存儲裏,所以在作技術選型時對外部存儲有必定的要求:
基於以上幾個點,最終選擇了 HBase,造成上圖的解決方案。
上面一行表示經過 CEP 以後把曝光點擊序列關聯在一塊兒,最下面是把特徵流經過 Flink 寫到 HBase 裏,去作外部狀態存儲,中間核心模塊是用於達到兩個流的關聯。拿到曝光點擊關聯以後去查 HBase 數據,若是可以正常查到,就會把它輸出到一個正常結果流裏。而對於那些不能構成關聯的數據,作了一個多級重試隊列,在屢次重試的時候會產生隊列降級,而且在重試的時候爲了減輕對 HBase 的掃描壓力,重試 Gap 會逐級增長。
另外還有一個退出機制,由於重試不是無限進行的。退出機制的存在緣由主要包括兩個點:
所以,退出機制意味着在重試屢次以後就會過時,而後會到重試過時的數據裏。
在有效點擊場景裏,其實也是兩個流的關聯,可是兩個場景裏的技術選型是徹底不同的。
首先看一下項目背景,在網大場景裏,影片自己就是一個廣告。用戶在點擊以後,就會進入到一個播放頁面。在播放頁面裏,用戶能夠免費觀看 6 分鐘,6 分鐘以後想要繼續觀看,須要是會員或者購買才行,在這裏須要統計的數據是有效點擊,定義是在點擊以後觀影時長超過 6 分鐘便可。
這種場景落實到技術上是兩個流的關聯,包括了點擊流和播放心跳流。
在這個場景裏,兩個流動 Gap 相對比較小,而在電影裏時長通常是兩個多小時,因此點擊以後的行爲,Gap 基本是在三個小時之內才能完成,所以這裏自己的狀態是相對比較小的,使用 Flink 的狀態管理能夠達到這樣的效果。
接下來咱們看一個具體的方案。
從流上來看,綠色部分是點擊流,藍色部分是播放心跳流。
在右邊的播放心跳流裏,這個狀態是對時長作累計,它自己是一個心跳流,好比每三秒傳一個心跳過來。咱們須要在這裏作一個計算,看它累計播放時長是否是達到 6 分鐘了,另外也看當前記錄是否是到了 6 分鐘。對應 Flink 裏的一個實現就是把兩個流經過 Connect 算子關係在一塊兒,而後能夠制定一個 CoProcessFunction,在這裏面有兩個核心算子。
算子給用戶提供了不少靈活性,用戶能夠在裏面作不少邏輯控制。相比不少的 Input Join,用戶可發揮的空間比較大。
針對以上案例作一個小結。如今雙流管理已經很是廣泛,有許多方案能夠選擇,好比 Window join,Interval join,還有咱們使用的 Connect + CoProcessFunction。除此以外,還有一些用戶自定義的方案。
在選型的時候,建議從業務出發,去作對應的技術選型。首先要思考多個流之間的事件關係,而後判斷出狀態是什麼規模,必定程度上能夠從上面不少方案裏排除不可行的方案。
在 Flink 內部主要是經過 Checkpoint 作容錯,Checkpoint 自己是對於 Job 內部的 Task 級別的容錯,可是當 Job 主動或異常重啓時,狀態沒法從歷史狀態恢復。
所以咱們這邊作了一個小的改進,就是一個做業在啓動的時候,它也會去 Checkpoint 裏把最後一次成功的歷史狀態拿到,而後作初始化管理,這樣就達到狀態恢復的效果。
Flink 自己實現端到端精確一次,首先須要開啓 Checkpoint 功能,而且在 Checkpoint 裏指定精確一次的語義。另外,若是在下游好比 Sink 端,它自己支持事務,就能夠結合兩階段提交與 Checkpoint 以及下游的事務作聯動,達到端到端精確一次。
在上圖右邊就是描述了這個過程。這是一個預提交的過程,就是 Checkpoint 協調器在作 Checkpoint 的時候,會往 Source 端注入一些 Barrier 數據,每一個 Source 拿到 Barrier 以後會作狀態存儲,而後把完成狀態反饋給協調器。這樣每一個算子拿到 Barrier,實際上是作相同的一個功能。
到 Sink 端以後,它會在 Kafka 裏提交一個預提交標記,後面主要是 Kafka 自己事務機制來保證的。在全部的算子都完成 Checkpoint 以後,協調器會給全部的算子發一個 ACK,發送一個確認狀態,這時候 Sink 端作一個提交動做就能夠了。
在以前的實踐中咱們發現,下游 Kafka 增長分區數時,新增分區無數據寫入。
原理是 FlinkKafkaProducer 默認使用 FlinkFixedPartitioner,每一個 Task 只會發送到下游對應的一個 Partition 中,若是下游 Kafka 的 Topic 的 Partition 大於當前任務的並行度,就會出現該問題。
解決辦法有兩個:
對於運行中的 Flink 做業,咱們須要查看它自己的一些狀態。好比在 Flink UI 裏面,它的不少指標都是在 Task 粒度,沒有總體的效果。
平臺這邊對這些指標作了進一步的聚合,統一在一個頁面裏面展現。
從上圖能夠看到,展現信息包括反壓狀態,時延狀況以及運行過程當中 JobManager 和 TaskManage 的 CPU / 內存的利用率。另外還有 Checkpoint 的監控,好比它是否超時,最近是否有 Checkpoint 已經失敗了,後面咱們會針對這些監控指標作一些報警通知。
當實時任務運營異常的時候,用戶是須要及時知道這個狀態的,如上圖所示,有一些報警項,包括報警訂閱人、報警級別,下面還有一些指標,根據前面設置的指標值,若是知足這些報警策略規則,就會給報警訂閱人推送報警,報警方式包括郵件、電話以及內部通信工具,從而實現任務異常狀態通知。
經過這種方式,當任務異常的時候,用戶能夠及時知曉這個狀態,而後進行人爲干預。
最後總結一下愛奇藝廣告業務在實時鏈路生產上面的關鍵節點。
以前咱們的 ETL 實時跟離線是分別作的,經過批處理的方式,而後換到 Hive 表裏邊,後面跟的是離線數倉。在實時裏,通過實時 ETL,放到 Kafka 裏邊,而後去作後續的實時數倉。
先在 ETL 作流批一體的第一個好處是離線數倉時效性提高,由於數據須要作反做弊,因此咱們給廣告算法提供基礎特徵的時候,反做弊以後的時效性對於後續總體效果的提高是比較大的,因此若是把 ETL 作成統一實時化以後,對於後續的指導意義很是大。
ETL 作到流批一體以後,咱們會把數據放在數據湖裏面,後續離線數倉和實時數倉均可以基於數據湖實現。流批一體能夠分爲兩個階段,第一階段是先把 ETL 作到一體,另外報表端也能夠放在數據湖裏邊,這樣咱們的查詢服務能夠作到一個更新的量級。由於以前須要離線表跟實時表作一個 Union 的計算,在數據湖裏面,咱們經過離線和實時寫一個表就能夠實現了。
關於將來規劃:
首先是流批一體,這裏包括兩個方面:
另外,如今的反做弊主要是經過離線的方式實現,後面可能會把一些線上的反做弊模型轉成實時化,把風險降到最低。