網易雲信做爲一個 PaaS 服務,須要對線上業務進行實時監控,實時感知服務的「心跳」、「脈搏」、「血壓」等健康情況。經過採集服務拿到 SDK、服務器等端的心跳埋點日誌,是一個很是龐大且雜亂無序的數據集,而如何纔能有效利用這些數據?服務監控平臺要作的事情就是對海量數據進行實時分析,聚合出表徵服務的「心跳」、「脈搏」、「血壓」的核心指標,並將其直觀的展現給相關同窗。這其中核心的能力即是 :實時分析和實時聚合。程序員
在以前的《網易雲信服務監控平臺實踐》一文中,咱們圍繞數據採集、數據處理、監控告警、數據應用 4 個環節,介紹了網易雲信服務監控平臺的總體框架。本文是對網易雲信在聚合指標計算邏輯上的進一步詳述。算法
基於明細數據集進行實時聚合,生產一個聚合指標,業界經常使用的實現方式是 Spark Streaming、Flink SQL / Stream API。不管是何種方式,咱們都須要經過寫代碼來指定數據來源、數據清洗邏輯、聚合維度、聚合窗口大小、聚合算子等。如此繁雜的邏輯和代碼,不管是開發、測試,仍是後續任務的維護,都須要投入大量的人力/物力成本。而咱們程序員要作的即是化繁爲簡、實現大巧不工。數據庫
本文將闡述網易雲信是如何基於 Flink 的 Stream API,實現一套通用的聚合指標計算框架。segmentfault
如上圖所示,是咱們基於 Flink 自研的聚合指標完整加工鏈路,其中涉及到的模塊包括:服務器
下文將詳細介紹這幾個模塊的設計和實現思路。微信
爲了便於聚合指標的生產和維護,咱們將指標計算過程當中涉及到的關鍵參數進行了抽象提煉,提供了可視化配置頁面,以下圖所示。下文會結合具體場景介紹各個參數的用途。架構
在聚合任務運行過程當中,咱們會按期加載配置。若是檢測到有新增的 Topic,咱們會建立 kafka-consumer 線程,接收上游實時數據流。同理,對於已經失效的配置,咱們會關閉消費線程,並清理相關的 reporter。併發
對於數據源相同的聚合指標,咱們共用一個 kafka-consumer,拉取到記錄並解析後,對每一個聚合指標分別調用 collect() 進行數據分發。若是指標的數據篩選規則(配置項⑤)非空,在數據分發前須要進行數據過濾,不知足條件的數據直接丟棄。app
基於 Flink 的 Stream API 實現聚合計算的核心代碼以下所示:框架
SingleOutputStreamOperator<MetricContext> aggResult = src .assignTimestampsAndWatermarks(new MetricWatermark()) .keyBy(new MetricKeyBy()) .window(new MetricTimeWindow()) .aggregate(new MetricAggFuction());
對於大數據量的聚合計算,數據傾斜是不得不考慮的問題,數據傾斜意味着規則中配置的分組字段(配置項⑥)指定的聚合 key 存在熱點。咱們的計算框架在設計之初就考慮瞭如何解決數據傾斜問題,就是將聚合過程拆分紅2階段:
具體實現:判斷併發度參數 parallelism(配置項⑦) 是否大於1,若是 parallelism 大於1,生成一個 [0, parallelism) 之間的隨機數做爲 randomKey,在第1階段聚合 keyBy() 中,將依據分組字段(配置項⑥)獲取的 key 與 randomKey 拼接,生成最終的聚合 key,從而實現了數據隨機打散。
做爲一個平臺型的產品,咱們提供了以下常見的聚合算子。因爲採用了二次聚合邏輯,各個算子在第1階段和第2階段採用了相應的計算策略。
算子 | 第1階段聚合 | 第2階段聚合 |
---|---|---|
min/max/sum/count | 直接對輸入數據進行預聚合計算,輸出預聚合結果 | 對第1階段預聚合結果進行二次聚合計算,輸出最終結果 |
first/last | 對輸入數據的 timestamp 進行比較,記錄最小/最大的 timestamp 以及對應的 value 值,輸出 <timestamp,value> 數據對 | 對 <timestamp,value> 數據對進行二次計算,輸出最終的 first/last |
avg | 計算該分組的和值和記錄數,輸出 <sum,cnt> 數據對 | 對 <sum,cnt> 數據對分別求和,而後輸出:總 sum / 總 cntcount |
median/tp90/tp95 | 統計輸入數據的分佈,輸出 NumericHistogram | 對輸入的 NumericHistogram 作 merge 操做,最終輸出中位數/tp90/tp95 |
count-distinct | 輸出記錄桶信息和位圖的 RoaringArray | 對 RoaringArray 進行 merge 操做,最終輸出精確的去重計數結果 |
count-distinct(近似) | 輸出基數計數對象 HyperLoglog | 對 HyperLoglog 進行 merge 操做,最終輸出近似的去重計數結果 |
對於計算結果受所有數據影響的算子,如 count-distinct(去重計數),常規思路是利用 set 的去重特性,將全部統計數據放在一個 Set 中,最終在聚合函數的 getResult 中輸出 Set 的 size。若是統計數據量很是大,這個 Set 對象就會很是大,對這個 Set 的 I/O 操做所消耗的時間將不能接受。
對於類 MapReduce 的大數據計算框架,性能的瓶頸每每出如今 shuffle 階段大對象的 I/O 上,由於數據須要序列化 / 傳輸 / 反序列化,Flink 也不例外。相似的算子還有 median 和 tp95。
爲此,須要對這些算子作專門的優化,優化的思路就是儘可能減小計算過程當中使用的數據對象的大小,其中:
後處理模塊,是對第2階段聚合計算輸出數據進行再加工,主要有2個功能:
這裏所說的異常數據,分爲兩類:遲到的數據和提早到的數據。
遲到數據:
聚合計算獲得的指標,默認輸出到 Kafka 和時序數據庫 InfluxDB。
爲了實時監控各個數據源和聚合指標的運行狀況,咱們經過 InfluxDB+Grafana 組合,實現了聚合計算全鏈路監控:如各環節的輸入/輸出 QPS、計算耗時、消費堆積、遲到數據量等。
目前,經過該通用聚合框架,承載了網易雲信 100+ 個不一樣維度的指標計算,帶來的收益也是比較可觀的:
聖少友,網易雲信數據平臺資深開發工程師,從事數據平臺相關工做,負責服務監控平臺、數據應用平臺、質量服務平臺的設計開發工做。
更多技術乾貨,歡迎關注【網易智企技術+】微信公衆號