技術實踐 | 如何基於 Flink 實現通用的聚合指標計算框架

1 引言

網易雲信做爲一個 PaaS 服務,須要對線上業務進行實時監控,實時感知服務的「心跳」、「脈搏」、「血壓」等健康情況。經過採集服務拿到 SDK、服務器等端的心跳埋點日誌,是一個很是龐大且雜亂無序的數據集,而如何纔能有效利用這些數據?服務監控平臺要作的事情就是對海量數據進行實時分析,聚合出表徵服務的「心跳」、「脈搏」、「血壓」的核心指標,並將其直觀的展現給相關同窗。這其中核心的能力即是 :實時分析和實時聚合程序員

在以前的《網易雲信服務監控平臺實踐》一文中,咱們圍繞數據採集、數據處理、監控告警、數據應用 4 個環節,介紹了網易雲信服務監控平臺的總體框架。本文是對網易雲信在聚合指標計算邏輯上的進一步詳述。算法

基於明細數據集進行實時聚合,生產一個聚合指標,業界經常使用的實現方式是 Spark Streaming、Flink SQL / Stream API。不管是何種方式,咱們都須要經過寫代碼來指定數據來源、數據清洗邏輯、聚合維度、聚合窗口大小、聚合算子等。如此繁雜的邏輯和代碼,不管是開發、測試,仍是後續任務的維護,都須要投入大量的人力/物力成本。而咱們程序員要作的即是化繁爲簡、實現大巧不工。數據庫

本文將闡述網易雲信是如何基於 Flink 的 Stream API,實現一套通用的聚合指標計算框架。segmentfault

2 總體架構

image.png

如上圖所示,是咱們基於 Flink 自研的聚合指標完整加工鏈路,其中涉及到的模塊包括:服務器

  • source:按期加載聚合規則,並根據聚合規則按需建立 Kafka 的 Consumer,並持續消費數據。
  • process:包括分組邏輯、窗口邏輯、聚合邏輯、環比計算邏輯等。從圖中能夠看到,咱們在聚合階段分紅了兩個,這樣作的目的是什麼?其中的好處是什麼呢?作過度布式和併發計算的,都會遇到一個共同的敵人:數據傾斜。在咱們 PaaS 服務中頭部客戶會更加明顯,因此傾斜很是嚴重,分紅兩個階段進行聚合的奧妙下文中會詳細說明。
  • sink:是數據輸出層,目前默認輸出到 Kafka 和 InfluxDB,前者用於驅動後續計算(如告警通知等),後者用於數據展現以及查詢服務等。
  • reporter:全鏈路統計各個環節的運行情況,如輸入/輸出 QPS、計算耗時、消費堆積、遲到數據量等。

下文將詳細介紹這幾個模塊的設計和實現思路。微信

3 source

規則配置

爲了便於聚合指標的生產和維護,咱們將指標計算過程當中涉及到的關鍵參數進行了抽象提煉,提供了可視化配置頁面,以下圖所示。下文會結合具體場景介紹各個參數的用途。架構

image.png

規則加載

在聚合任務運行過程當中,咱們會按期加載配置。若是檢測到有新增的 Topic,咱們會建立 kafka-consumer 線程,接收上游實時數據流。同理,對於已經失效的配置,咱們會關閉消費線程,並清理相關的 reporter。併發

數據消費

對於數據源相同的聚合指標,咱們共用一個 kafka-consumer,拉取到記錄並解析後,對每一個聚合指標分別調用 collect() 進行數據分發。若是指標的數據篩選規則(配置項)非空,在數據分發前須要進行數據過濾,不知足條件的數據直接丟棄。app

4 process

總體計算流程

基於 Flink 的 Stream API 實現聚合計算的核心代碼以下所示:框架

SingleOutputStreamOperator<MetricContext> aggResult = src
        .assignTimestampsAndWatermarks(new MetricWatermark())
        .keyBy(new MetricKeyBy())
        .window(new MetricTimeWindow())
        .aggregate(new MetricAggFuction());
  • MetricWatermark():根據指定的時間字段(配置項⑧)獲取輸入數據的 timestamp,並驅動計算流的 watermark 往前推動。
  • MetricKeyBy():指定聚合維度,相似於 MySQL 中 groupby,根據分組字段(配置項⑥),從數據中獲取聚合維度的取值,拼接成分組 key。
  • MetricTimeWindow():配置項⑧中指定了聚合計算的窗口大小。若是配置了定時輸出,咱們就建立滑動窗口,不然就建立滾動窗口。
  • MetricAggFuction():實現配置項②指定的各類算子的計算,下文將詳細介紹各個算子的實現原理。

二次聚合

對於大數據量的聚合計算,數據傾斜是不得不考慮的問題,數據傾斜意味着規則中配置的分組字段(配置項⑥)指定的聚合 key 存在熱點。咱們的計算框架在設計之初就考慮瞭如何解決數據傾斜問題,就是將聚合過程拆分紅2階段:

  • 第1階段:將數據隨機打散,進行預聚合。
  • 第2階段:將第1階段的預聚合結果做爲輸入,進行最終的聚合。

具體實現:判斷併發度參數 parallelism(配置項⑦) 是否大於1,若是 parallelism 大於1,生成一個 [0, parallelism) 之間的隨機數做爲 randomKey,在第1階段聚合 keyBy() 中,將依據分組字段(配置項⑥)獲取的 key 與 randomKey 拼接,生成最終的聚合 key,從而實現了數據隨機打散。

聚合算子

做爲一個平臺型的產品,咱們提供了以下常見的聚合算子。因爲採用了二次聚合邏輯,各個算子在第1階段和第2階段採用了相應的計算策略。

算子 第1階段聚合 第2階段聚合
min/max/sum/count 直接對輸入數據進行預聚合計算,輸出預聚合結果 對第1階段預聚合結果進行二次聚合計算,輸出最終結果
first/last 對輸入數據的 timestamp 進行比較,記錄最小/最大的 timestamp 以及對應的 value 值,輸出 <timestamp,value> 數據對 對 <timestamp,value> 數據對進行二次計算,輸出最終的 first/last
avg 計算該分組的和值和記錄數,輸出 <sum,cnt> 數據對 對 <sum,cnt> 數據對分別求和,而後輸出:總 sum / 總 cntcount
median/tp90/tp95 統計輸入數據的分佈,輸出 NumericHistogram 對輸入的 NumericHistogram 作 merge 操做,最終輸出中位數/tp90/tp95
count-distinct 輸出記錄桶信息和位圖的 RoaringArray 對 RoaringArray 進行 merge 操做,最終輸出精確的去重計數結果
count-distinct(近似) 輸出基數計數對象 HyperLoglog 對 HyperLoglog 進行 merge 操做,最終輸出近似的去重計數結果

對於計算結果受所有數據影響的算子,如 count-distinct(去重計數),常規思路是利用 set 的去重特性,將全部統計數據放在一個 Set 中,最終在聚合函數的 getResult 中輸出 Set 的 size。若是統計數據量很是大,這個 Set 對象就會很是大,對這個 Set 的 I/O 操做所消耗的時間將不能接受。

對於類 MapReduce 的大數據計算框架,性能的瓶頸每每出如今 shuffle 階段大對象的 I/O 上,由於數據須要序列化 / 傳輸 / 反序列化,Flink 也不例外。相似的算子還有 median 和 tp95。

爲此,須要對這些算子作專門的優化,優化的思路就是儘可能減小計算過程當中使用的數據對象的大小,其中:

  • median/tp90/tp95:參考了 hive percentile_approx 的近似算法,該算法經過 NumericHistogram(一種非等距直方圖)記錄數據分佈,而後經過插值的方式獲得相應的 tp 值(median 是 tp50)。
  • count-distinct:採用 RoaringBitmap 算法,經過壓縮位圖的方式標記輸入樣本,最終獲得精確的去重計數結果。
  • count-distinct(近似) :採用 HyperLoglog 算法,經過基數計數的方式,獲得近似的去重計數結果。該算法適用於大數據集的去重計數。

後處理

後處理模塊,是對第2階段聚合計算輸出數據進行再加工,主要有2個功能:

  • 複合指標計算:對原始統計指標進行組合計算,獲得新的組合指標。例如,要統計登陸成功率,咱們能夠先分別統計出分母(登陸次數)和分子(登陸成功的次數),而後將分子除以分母,從而獲得一個新的組合指標。配置項③就是用來配置組合指標的計算規則。
  • 相對指標計算:告警規則中常常要判斷某個指標的相對變化狀況(同比/環比)。咱們利用 Flink 的state,可以方便的計算出同比/環比指標,配置項④就是用來配置相對指標規則。

異常數據的處理

這裏所說的異常數據,分爲兩類:遲到的數據和提早到的數據。

  • 遲到數據

    • 對於嚴重遲到的數據(大於聚合窗口的 allowedLateness),經過 sideOutputLateData 進行收集,並經過 reporter 統計上報,從而可以在監控頁面進行可視化監控。
    • 對於輕微遲到的數據(小於聚合窗口的 allowedLateness),會觸發窗口的重計算。若是每來一條遲到數據就觸發一次第 1 階段窗口的重計算,重計算結果傳導到第 2 階段聚合計算,就會致使部分數據的重複統計。爲了解決重複統計的問題,咱們在第 1 階段聚合 Trigger 中進行了特殊處理:窗口觸發採用 FIRE_AND_PURGE(計算並清理),及時清理已經參與過計算的數據。
  • 提早到的數據:這部分數據每每是數據上報端的時鐘不許致使。在計算這些數據的 timestamp 時要人爲干預,避免影響整個計算流的 watermark。

5 sink

聚合計算獲得的指標,默認輸出到 Kafka 和時序數據庫 InfluxDB。

  • kafka-sink:將指標標識(配置項①)做爲 Kafka 的topic,將聚合結果發送出去,下游接收到該數據流後能夠進一步處理加工,如告警事件的生產等。
  • InfluxDB-sink:將指標標識(配置項①)做爲時序數據庫的表名,將聚合結果持久化下來,用於 API 的數據查詢、以及可視化報表展現等。

6 reporter

爲了實時監控各個數據源和聚合指標的運行狀況,咱們經過 InfluxDB+Grafana 組合,實現了聚合計算全鏈路監控:如各環節的輸入/輸出 QPS、計算耗時、消費堆積、遲到數據量等。

image.png

7 結語

目前,經過該通用聚合框架,承載了網易雲信 100+ 個不一樣維度的指標計算,帶來的收益也是比較可觀的:

  • 提效:採用了頁面配置化方式實現聚合指標的生產,開發週期從天級縮短到分鐘級。沒有數據開發經驗的同窗也可以本身動手完成指標的配置。
  • 維護簡單,資源利用率高:100+ 個指標只需維護 1 個 flink-job,資源消耗也從 300+ 個 CU 減小到 40CU。
  • 運行過程透明:藉助於全鏈路監控,哪一個計算環節有瓶頸,哪一個數據源有問題,一目瞭然。

做者介紹

聖少友,網易雲信數據平臺資深開發工程師,從事數據平臺相關工做,負責服務監控平臺、數據應用平臺、質量服務平臺的設計開發工做。

更多技術乾貨,歡迎關注【網易智企技術+】微信公衆號

相關文章
相關標籤/搜索