你們好,我是來自騰訊大數據團隊的楊華(vinoyang),很高興可以參加此次北京的 QCon,有機會跟你們分享一下騰訊實時流計算平臺的演進與這個過程當中咱們的一些實踐經驗。web
此次分享主要包含四個議題,我會首先闡述一下騰訊在實時計算中使用 Flink 的歷程,而後會簡單介紹一下騰訊圍繞 Flink 的產品化實踐:咱們打造了一個 Oceanus 平臺,同時騰訊雲也早已提供基於 Flink 的實時流計算服務,接着咱們會重點跟你們聊一聊咱們對社區版 Flink 的一些擴展與改進、優化。算法
Flink 在騰訊實時計算概況簡介微信
首先,咱們進入第一個議題。Flink 在騰訊正式被考慮替代 Storm 是在 2017 年。架構
17 年上半年,咱們主要在調研 Flink 替換 Storm 的可行性、特性、性能是否可以知足咱們的上線要求。在此以前,咱們內部以 Storm 做爲實時計算的基礎框架也已經有幾年的時間了,在使用的過程當中也發現了 Storm 的一些痛點,好比,沒有內置狀態的支持,沒有提供完備的容錯能力,沒有內置的窗口 API,core API 沒法提供 Exactly-once 的語義保證等等。app
17 年下半年,咱們從社區拉出當時最新的發佈分支(1.3.2)做爲咱們內部的定製開發分支進行開發。做爲一個試點,咱們選擇了內部一個流量較大的業務來進行替換,這個業務在咱們內部是以 standalone 的模式部署的,因此咱們最初也使用的是 Flink 的 standalone 部署模式。框架
18 年上半年,咱們開始圍繞 Flink 進行產品化,打造了一個全流程、一體化的實時流計算平臺——Oceanus,來簡化業務方構建實時應用的複雜度並下降運維成本,這也基本明確了後續咱們主要的運行模式是 Flink on YARN。運維
18 年下半年,咱們的 Oceanus 平臺已經有足夠的能力來構建常見的流計算應用,咱們部門內部的一些實時流計算業務也已經在平臺上穩定運行,因而咱們開始爲騰訊雲、騰訊其餘事業羣以及業務線提供流計算服務。同時,咱們也將平臺整合進咱們的大數據套件,爲外部私有云客戶提供流計算服務。機器學習
19 年上半年,咱們的主要目標是在 Oceanus 上沉澱並完善上層的場景化服務建設,好比提供在線機器學習、風控等場景化服務。另外,咱們也在 Flink 批處理方向發力,利用 Flink 的計算能力來知足跨數據中心,跨數據源的聯合分析需求。它能夠作到:數據源 SQL 下推,避免集羣帶寬資源浪費;單 DC 內 CBO(基於代價優化),生成最優的執行計劃;跨 DC CBO,根據 DC 負載和資源選擇最佳 DC 執行計算,從而得到更好的資源利用和更快的查詢性能。以上就是騰訊使用 Flink 的整個歷程。ide
這幅圖展現了,Flink 目前在騰訊內部已經爲一些咱們耳熟能詳的產品提供實時計算的服務。這些產品,包括微信、支付、財付通、騰訊雲、QQ、空間、音樂、遊戲、K 歌等等。咱們列舉其中幾個業務的使用案例,微信使用咱們的平臺來統計朋友圈的實時瀏覽信息、小遊戲種子用戶的 UV 計算、實時惡意流量分析判斷、看一看的紅點信息;支付用來計算商戶交易相關的統計;音樂用於實時點唱、熱門排行榜等等。函數
接下來咱們來了解一下,目前 Flink 在騰訊使用的現狀。目前咱們 Oceanus 平臺 YARN 集羣的 vcore 總數目達到了 34 萬,累計的峯值計算能力接近 2.1 億 / 秒,日均處理消息規模近 20 萬億。到目前爲止,騰訊內部除了廣告的在線訓練業務外,原先運行在 Storm 上的實時流計算業務都已逐步遷移到 Flink 引擎上,而廣告這塊的業務預計也會在今年下半年遷移完成。
Oceanus 平臺簡介
接下來,咱們進入第二個議題:簡要介紹一下咱們的 Oceanus 平臺。
首先,咱們來看一下 Oceanus 平臺的總體技術架構。咱們內部定製版的 Flink 引擎,稱之爲 TDFLINK,它跟其餘的一些大數據基礎設施框架交互並協同支撐了咱們上層的 Oceanus 平臺,Oceanus 支持畫布、SQL 以及 Jar 三種形式構建應用,爲了方便業務方下降總體成本,咱們還提供了配置、測試、部署等完整配套的功能,在平臺之上咱們提供了一些領域特定的場景化服務好比 ETL、監控、推薦廣告等。
下面咱們來介紹 Oceanus 的幾個典型功能。首先這是某個用戶的應用列表頁。從列表中,咱們能夠看到應用的當前狀態、類型、迭代的版本,它歸屬於哪一個場景等信息。在操做欄,咱們能夠一鍵對應用進行啓停、調試,查看它的指標信息等,除此以外咱們還提供了不少便捷的操做,好比快速複製一個應用,他們都收納在「更多」菜單按鈕中。
這是咱們的一個指標分鐘級統計的畫布應用詳情頁,咱們爲 ETL 類型的應用提供了一個通用的 transform 算子。它提供了不少功能細分的可插拔的便捷函數來簡化常見的事件解析與提取的複雜度。在圖中,多種不一樣類型的指標通過 split 算子分流後將相同的指標進行歸類,而後再對它們應用各自的統計邏輯,就像這裏的窗口同樣,基本上每一個算子都是配置化的。像這種類型的應用咱們經過拖拽、配置就能夠輕鬆完成它的構建。
這幅圖展現了咱們的指標詳情頁檢查點的指標明細,爲了讓業務人員更直觀地瞭解它們最關心的指標信息,咱們將一些必要的指標進行了從新梳理並展現到咱們的平臺上,這裏有些指標直接使用了 Flink 提供的 REST API 接口,而有些指標則是咱們內部擴展定製的。
最後,咱們來介紹一下最近上線的在線機器學習模塊。這是咱們一個模型訓練應用的詳情頁,一樣它也是畫布類型的,咱們對常規的機器學習類型的應用進行了步驟拆分,包括了數據預處理、算法等相關步驟,每一個步驟均可以進行拖拽,再加上配置的方式就能夠完成一個 ML 類型的應用建立。
對於訓練獲得的模型,咱們也提供了模型服務功能,咱們用模型服務組來管理每一個模型的不一樣時間的版本,點擊右側的「評估報告」能夠查看這個模型的 AUC 趨勢。
以上是對 Oceanus 平臺的介紹,若是你們有興趣能夠掃描 PPT 最後的二維碼來進一步瞭解咱們的平臺以及騰訊雲上的流計算服務。
針對 Flink 的擴展與優化
接下來,咱們進入下一個議題,介紹咱們內部 Flink 版本在經過騰訊雲對外提供服務時基於內部以及業務的相關需求對社區版的擴展與優化。
第一個改進是咱們重構了 Flink Web UI,咱們重構的緣由是由於社區版的 Flink Web UI 在定位問題的時候不能提供足夠的信息,致使問題定位的效率不夠高。尤爲是 Job 並行度很是大,YARN 的 container 數目很是多的時候,當 Job 發生失敗,很難快速去找到 container 和節點以查看進程的堆棧或者機器指標。因此,爲了更高效地定位問題,咱們對 Flink web UI 進行了重構並暴露了一些關鍵指標。
這是咱們一個 TaskManager 的詳情頁,咱們爲它新增了一個「Threads」 Tab,咱們能夠經過它看到 Task 相關的線程信息:線程名稱、CPU 消耗、狀態以及堆棧等。這樣一旦哪一個算子的線程可能成爲瓶頸時,咱們能夠快速定位到它阻塞在什麼方法調用上。
接下來的這個改進是對 JobManager failover 的優化。你們應該都知道社區版的 Flink JobManager HA 在 Standalone 模式下有個很大的問題是:它的 standby 節點是冷備的,JobManager 的切換會致使它管理的全部 Job 都會被重啓恢復,這一行爲在咱們現網環境中是不可接受的。因此,咱們首先定製的第一個大特性就是 JobManager 的 failover 優化,讓 standby 節點變成熱備,這使得 JobManager 的切換對 TaskManager 上已經正在運行的 Job 不產生影響。咱們已經對 Standalone 以及 Flink on YARN 這兩種部署模式支持了這個特性,Flink on YARN 的支持還處於內部驗證階段。咱們以對 Standalone 模式的優化爲例來進行分析,它主要包含這麼幾個步驟:
取消 JobManager 跟 TaskManager 由於心跳超時或 Leadership 變更就 cancel task 的行爲;
對 ExecutionGraph 核心數據的快照;
經過 ExecutionGraphBuilder 重構空的 ExecutionGraph 加上快照重置來恢復出一個跟原先等價的 ExecutionGraph 對象;
TaskManager 跟新的 JobManager leader 創建鏈接後以心跳上報本身的狀態和必要的信息;
新的 JobManager 確認在 reconcile 階段 Job 的全部 task 是否正常運行。
接下來的這個改進已經在反饋社區的過程當中,它就是對檢查點失敗處理的改進。在探討改進以前,咱們先來了解一下社區版當前的處理機制。JobMaster 中,每一個 Job 會對應一個 Checkpoint Coordinator,它用來管理並協調 Job 檢查點的執行。當到達一個檢查點的觸發週期,Coordinator 會對全部的 Source Task 下發 TriggerCheckpoint 消息,source task 會在自身完成快照後向下游廣播 CheckpointBarrier,做爲下游 task 觸發的通知。其中,若是一個 task 在執行檢查點時失敗了,這取決於用戶是否容忍這個失敗(經過一個配置項),若是選擇不容忍那麼這個失敗將變成一個異常致使 task 的失敗,與此同時 task 的失敗將會通知到 JobMaster,JobMaster 將會通知這個 Job 的其餘 task 取消它們的執行。現有的機制存在一些問題:
Coordinator 並不能控制 Job 是否容忍檢查點失敗,由於控制權在 task 端;
Coordinator 當前的失敗處理代碼邏輯混亂,區分出了觸發階段,卻忽略了執行階段;
沒法實現容忍多少個連續的檢查點失敗則讓 Job 失敗的邏輯。
瞭解了現有的實現機制,咱們再來看接下來的改進方案。首先,咱們對源碼中 checkpoint package 下的相關類進行了重構,使得它再也不區分觸發階段,引進了更多的檢查點失敗緣由的枚舉並重構了相關的代碼。而後咱們引入了 CheckpointFailureManager 組件,用來統一失敗管理,同時爲了適配更靈活的容忍失敗的能力,咱們引入了檢查點失敗計數器機制。如今,當咱們遇到檢查點失敗後,這個失敗信息會直接上報到 Coordinator,而是否要讓 Job 失敗具體的決策則由 CheckpointFailureManager 做出,這就使得 Coordinator 具備了完整的檢查點控制權,而決策權轉讓給 CheckpointFailureManager,則充分實現了邏輯解耦。
下面咱們要看的這個特性是對 Flink 原生窗口的加強,因此咱們稱之爲 Enhanced window。你們都知道 Flink 的 EventTime 語義的窗口沒法保證任意延遲到達的數據都能參與窗口計算,它只容許你設置一個容忍延遲的時間。但咱們的應用場景裏,數據的延遲可能很是高,甚至有時跨天的也會發生,但咱們沒法爲常規的窗口設置這麼長的延遲時間,而且咱們的業務沒法容忍延遲數據被丟棄的行爲。所以針對這種場景,Flink 自帶的窗口沒法知足咱們的需求。因此,咱們對它作了一些改進,它可以容忍任意延遲到來的事件,全部的事件都不會被丟棄,而是會加入一個新的窗口從新計算,新窗口跟老窗口毫無關係,因此最終可能針對一個窗口在用戶的目標表中會存在多條記錄,用戶只需自行聚合便可。
爲了方便在上層使用這種窗口,咱們爲它定製了 SQL 關鍵字,這幅圖展現了咱們在指標統計場景中使用它的一個示例。
這是咱們根據業務需求所定製的另外一個窗口——增量窗口。在業務中常常遇到這樣的需求:但願看到一個窗口週期內的增量變化,這個窗口週期可能會很長,好比一個天級別的窗口。好比咱們但願看到一天內每一個小時的 PV 增加趨勢,或者遊戲中的一些虛擬物品的消耗趨勢。Flink 默認的翻滾窗口以及觸發器是沒有內置這種窗口內小批次觸發的功能。固然咱們也能夠經過一個個的小窗口來計算階段性的結果,而後再對數據進行二次處理,但這樣會比較麻煩。因此咱們實現了大窗口內屢次增量觸發的功能,擴展實現了一個窗口內屢次觸發的 Trigger,並定製了相應的 SQL 語法來供業務使用。這裏咱們能夠看到雖然是大窗口,但因爲數據都在不斷地進行增量聚合,因此並不會 hold 住很是大的狀態集。
這幅圖展現了增量窗口的使用方式,經過新的關鍵字,底層會映射到咱們自實現的觸發器。
接下來咱們要看的這個特性是咱們對 Flink keyBy 的優化,咱們稱之爲:LocalKeyBy。咱們在使用 KeyBy 的時候都遇到過數據熱點的問題,也就是數據傾斜。數據傾斜主要是業務數據的 key 取值不夠離散,而 keyBy 背後是 hash 的 partition 方式,它根據一個 key 的 hash 值來決定數據要落到哪一個節點分區。若是發生數據傾斜很容易形成計算資源利用不均以及反壓(back pressure)等問題產生。針對這一點,咱們在保證計算語義的狀況下對 keyBy 進行了優化,開發了 LocalKeyBy 功能。它的原理是經過本地預聚合來減小發送的數據量,但這裏須要注意的一點是:使用這個算子的時候,須要對原有的實現代碼進行調整,由於它將原來的 keyBy 拆分爲了兩步:預聚合以及合併。
咱們在本地對 keyBy 與 LocalKeyBy 作了一個簡單的性能對比測試,發如今流量傾斜嚴重的狀況下,使用 LocalKeyBy 總體性能並無受到太大的影響,但 Flink 原生的 keyBy 則隨着流量的傾斜而產生顯著的性能降低。
咱們繼續來看一個特性:水位線算子定時檢測流分區空閒的功能。Flink 社區目前針對 Source 實現了定時的流 idle 檢測功能(雖然沒有開放),它主要針對的場景是 Kafka 某個分區空閒無數據從而形成對應的 subtask 沒法正常提取 watermark,致使對下游的計算產生影響。
但咱們的場景和社區略有差異,咱們沒有將全部的邏輯都壓到 source 裏,爲了進行邏輯拆分咱們引入了一個 transform 算子,它專門針對 ETL 的場景,因此咱們的 watermark 不少狀況下不在 source 算子上提取,而是在下游的某個算子上,在某些狀況下,若是 watermark 的分配算子在 filter 之類的算子後面,則可能形成某個 pipeline 在中間斷流,也形成了沒法正常提取 watermark 的情形。針對這種場景,咱們在提取 watermark 的算子上也實現了定時檢測流 idle 的功能。這樣就算由於某個分區的數據都被過濾掉形成空閒,也不至於對下游的計算產生影響。
咱們介紹的下一個特性是 Framework 與用戶業務日誌的分離。這個特性其實最受益的是 Standalone 部署模式,由於這種模式下多 job 的 task 是混合部署在同一個 TaskManager 中的,而 TaskManager 自己只使用一個日誌文件來記錄日誌。因此,這致使排查業務問題很是麻煩。另外,咱們對 Flink web UI 展現日誌文件也作了一些改進,咱們會列出 JobManager 以及 TaskManager 的日誌文件夾中全部的文件列表。這是由於,隨着流應用長時間運行,累積的日誌量會愈來愈大,咱們一般都會對應用的日誌配置滾動策略,除此以外咱們還會輸出 GC 日誌等,而 Flink 的 web UI 默認只能展現最新的那個日誌文件,這對於咱們定位問題很不方便。因此,咱們引入了一個新的 tab,它可以列出日誌文件夾下的文件列表,點擊後再請求特定的日誌文件。
在分析這個特性的實現以前,咱們須要先了解 Flink 目前加載日誌框架類的方式,它爲了不跟業務 Job 中可能包含的日誌框架的依賴、配置文件產生衝突,日誌相關類的加載都代理給平臺的類加載器,也就是 TaskManager 的類加載器,而 TaskManager 自己加載的這些類都是從 Flink 安裝包的 lib 底下加載的。而關於日誌配置文件,Flink 經過 JVM 啓動參數來指定配置文件路徑以及日誌文件路徑。這些機制共同保證了 Flink 不會受到用戶 job jar 的干擾。
因此,若是咱們要實現日誌分離,咱們就須要打破 Flink 原先的實現機制,關鍵點在於:爲不一樣 Job 的 Task 加載不一樣的日誌類;爲不一樣 Job 的 Task 指定不一樣的配置文件以及用戶日誌文件的路徑。這意味着咱們須要定製 Flink 自帶的 user classloader。針對第一點,咱們再也不將這些日誌類的加載代理給平臺的加載器,而是將平臺類加載器中日誌相關的 jar 的 classpath 加入到各個 task 本身的 classloader 中。
關於配置文件,咱們顯然也不能用 Flink 平臺的配置文件。咱們會拿平臺使用的配置文件做爲模板,對其內部的日誌路徑進行動態修改,而後將內存中的這個配置文件傳遞給特定的日誌框架。那麼這裏就有一個問題,內存中的配置文件二進制數據怎麼被日誌框架讀取。log4j 以及 logback 均可以接收配置文件的 URL 表示,而 URL 也能夠接收一個 URLStreamHandler 的實現(它是全部流協議的處理器用於鏈接二進制的數據流與 URL),經過效仿 bytebuddy(一個動態修改 Java 二進制字節碼的類庫),咱們實現了 ByteArrayUrlStreamHandler 來進行二進制的配置文件跟 URL 之間的銜接,這兩點完成後不一樣 Job 的 Task 的類加載器就保證了日誌類加載和配置的徹底獨立性。
目前,咱們內部所定製優化的一些特性有些已逐步反饋給社區,還有一些比較大的改動也在跟社區商討合併計劃。咱們歡迎有志於迎接萬億級數據規模挑戰以及參與 Flink 引擎研發的同窗加入咱們。