來源 | eBay Unified Monitoring Platform
翻譯 | 顧欣怡html
Sherlock.IO 是 eBay 現有的監控平臺,天天要處理上百億條日誌、事件和指標。Flink Streaming job 實時處理系統用於處理其中的日誌和事件。本文將結合監控系統 Flink 的現狀,具體講述 Flink 在監控系統上的實踐和應用,但願給同業人員一些借鑑和啓發。算法
eBay 的監控平臺 Sherlock.IO 天天處理着上百億條日誌(log),事件(event)和指標(metric)。經過構建 Flink Streaming job 實時處理系統,監控團隊可以及時將日誌和事件的處理結果反饋給用戶。當前,監控團隊維護着 8 個 Flink 集羣,最大的集羣規模達到上千個 TaskManager,總共運行着上百個做業(job),一些做業已經穩定運行了半年以上。apache
爲了讓用戶和管理員可以更加快捷地建立Flink做業並調整參數,監控團隊在 Flink 上搭建了一套元數據微服務(metadata service),該服務可以用Json來描述一個做業的 DAG,且相同的 DAG 共用同一個做業,可以更加方便地建立做業,無需調用 Flink API。Sherlock.IO 流處理總體的架構如圖1所示。網絡
圖1 Sherlock.IO 流處理總體架構架構
目前,用這套元數據微服務建立的做業僅支持以 Kafka 做爲數據源,只要數據接入到 Kafka,用戶就能夠定義 Capability 來處理邏輯從而經過 Flink Streaming 處理數據。框架
元數據微服務框架如圖 2 所示,最上層是元數據微服務提供的 Restful API, 用戶經過調用 API 來描述和提交做業。描述做業的元數據包含三個部分:Resource,Capability 和 Policy。Flink 適配器(Adaptor)鏈接了 Flink Streaming API 和元數據微服務 API,且會根據元數據微服務描述的做業調用 Flink Streaming API 來建立做業,從而屏蔽 Flink StreamAPI。運維
所以,用戶不用瞭解 Flink Streaming API 就能夠建立 Flink 做業。將來若是須要遷移到其餘的流處理框架,只要增長一個適配器,就能夠將現有的做業遷移到新的流處理框架上。微服務
圖2 元數據微服務框架工具
Capability 定義了做業的 DAG 以及每一個算子(Operator)所用的 Class,圖 3 是事件處理(eventProcess) Capability,它最終會生成如圖 4 的 DAG。事件處理 Capability 先從 Kafka 讀出數據,再寫到 Elasticsearch 中。該 Capability 將該做業命名爲「eventProcess」,並定義其並行度爲「5」,其算子爲「EventEsIndexSinkCapability」, 其數據流爲「Source –> sink」。性能
圖3 eventESSink Capability
圖4 生成的Flink做業
每一個命名空間(Namespace)須要定義一個或多個 Policy,每一個 Policy 指定了相應的 Capability,即指定了用哪一套 DAG 來運行這個 Policy。Policy 還定義了這個做業的相關配置,例如從哪一個 Kafka topic 中讀取數據,寫到 ElasticSearch 的哪一個索引(Index)中,中間是否要跳過某些算子等等。
其次,Policy 還能做爲一個簡易的過濾器(Filter),能夠經過配置 Jexl 表達式過濾掉一些不須要的數據,提升做業的吞吐量。
另外,咱們還實現了 Zookeeper 定時更新的機制,使得 Policy 修改後再也不須要重啓做業,只要是在更新時間間隔內,該命名空間的 Policy 修改就會被自動應用到做業上。圖 5 是命名空間爲 paas 的 Policy 示例。
圖5 paas alertESSink Policy
Resource 定義了某個命名空間所須要的資源,好比 Flink 集羣, Kafka broker,ES 集羣等等。咱們有多個 Flink 集羣和 ES 集羣,經過 Resource 配置,做業能夠知道某個命名空間的日誌應該寫到哪一個 ES 集羣,並能夠判斷該命名空間的數據應該從哪一個 Kafka 集羣讀取。
爲了減小做業數量,咱們可讓相同的 DAG 複用同一個做業。咱們先給不一樣的 Policy 指定相同的 Capability,在該 Capability 資源足夠的狀況下,這些 Policy 就會被調度到同一個做業上。
以 SQL 的 Capability 爲例,每一個 Policy 的 SQL 語句不盡相同,若是爲每一個 Policy 都建立一個做業, Job Manager 的開銷就會很大,且很差管理。所以,咱們能夠爲 SQL Capability 配置 20 個 Slot,每一個 Policy 佔用一個 Slot。那麼該 Capability 生成的做業就能夠運行 20 個 Policy。
做業運行時,從 Source 讀進來的數據會被打上相應 Policy 的標籤,並執行該 Policy 定義的 SQL 語句,從而實現不一樣 Policy 共享同一個做業,大大減小了做業的數量。
用共享做業還有一個好處:若是多個命名空間的數據在一個 Kafka topic 裏,那麼只要讀一遍數據便可,不用每一個命名空間都讀一次 topic 再過濾,這樣就大大提升了處理的效率。
瞭解元數據驅動後,讓咱們來看看能夠經過哪些方法實現 Flink 做業的而優化和監控。
在 Flink 集羣的運維過程當中,咱們很難監控做業的運行狀況。即便開啓了檢查點(checkpoint),咱們也沒法肯定是否丟失數據或丟失了多少數據。所以,咱們爲每一個做業注入了 Heartbeat 以監控其運行狀況。
Heartbeat 就像 Flink 中用來監控延遲的「LatencyMarker」同樣,它會流過每一個做業的管道。但與 LatencyMarker 不一樣的是,當 Heartbeat 遇到 DAG 的分支時,它會分裂並流向每一個分支,而不像 LatencyMarker 那樣隨機流向某一個分支。另外一個不一樣點在於 Heartbeat 不是由 Flink 自身產生,而是由元數據微服務定時產生,然後由每一個做業消費。
如圖 4 所示,每一個做業在啓動的時候會默認加一個 Heartbeat 的數據源。Heartbeat 流入每一個做業後,會隨數據流一塊兒通過每一個節點,在每一個節點上打上當前節點的標籤,而後跳過該節點的處理邏輯流向下個節點。直到 Heartbeat 流到最後一個節點時,它會以指標(Metric)的形式發送到 Sherlock.IO(eBay 監控平臺)。
該指標包含了 Heartbeat 產生的時間,流入做業的時間以及到達每一個節點的時間。經過這個指標,咱們能夠判斷該做業在讀取 kafka 時是否延時,以及一條數據被整個管道處理所用的時間和每一個節點處理數據所用的時間,進而判斷該做業的性能瓶頸。
因爲 Heartbeat 是定時發送的,所以每一個做業收到的 Heartbeat 個數應該一致。若最後發出的指標個數與指望不一致,則能夠進一步判斷是否有數據丟失。
圖 6 描述了某 Flink 做業中的數據流以及 Heartbeat 的運行狀態:
圖6 Heartbeat在做業中的運行過程
有了 Heartbeat,咱們就能夠用來定義集羣的可用性。首先,咱們須要先定義在什麼狀況下屬於不可用的:
當內存不足(OutofMemory)或代碼運行錯誤時,做業就可能會意外重啓。咱們認爲重啓過程當中形成的數據丟失是不可用的狀況之一。所以咱們的目標之一是讓 Flink 做業可以長時間穩定運行。
有時由於基礎設施的問題致使物理機或者容器沒啓動起來,或是在 Flink 做業發生重啓時因爲 Slot 不夠而沒法啓動,或者是由於 Flink 做業的重啓次數已經超過了最大重啓次數(rest.retry.max-attempts), Flink 做業就會停止。此時須要人工干預才能將做業從新啓動起來。
咱們認爲 Flink 做業停止時,也是不可用的狀況之一。
發生這種狀況,通常是由於遇到了反壓(BackPressure)。形成反壓的緣由有不少種,好比上游的流量過大,或者是中間某個算子的處理能力不夠,或者是下游存儲節點遇到性能瓶頸等等。雖然短期內的反壓不會形成數據丟失,但它會影響數據的實時性,最明顯的變化是延遲這個指標會變大。
咱們認爲反壓發生時是不可用的狀況之一。
針對以上三種狀況,咱們均可以用 Heartbeat 來監控,並計算可用性。好比第一種狀況,若是做業重啓時發生了數據丟失,那麼相應的那段管道的 Heartbeat 也會丟失,從而咱們能夠監測出是否有數據丟失以及粗粒度地估算數據丟了多少。對於第二種狀況,看成業停止時,HeartBeat 也不會被處理,所以能夠很快發現做業中止運行並讓 on-call 及時干預。第三種狀況當反壓發生時,HeartBeat 也會被阻塞在發生反壓的上游,所以 on-call 也能夠很快地發現反壓發生並進行人工干預。
綜上,Heartbeat 能夠很快監測出 Flink 做業的運行狀況。那麼,如何評估可用性呢?因爲 Heartbeat 是定時發生的,默認狀況下咱們設置每 10 秒發一次。1 分鐘內咱們指望每一個做業的每條管道可以發出 6 個帶有做業信息的 heartbeat,那麼天天就能夠收到 8640 個 Heartbeat。
所以,一個做業的可用性能夠定義爲:
Slot 是 Flink 運行做業的最小單位[1],每一個 TaskManager 能夠分配一個至多個 Slot(通常分配的個數爲該 TaskManager 的 CPU 數)。根據 Flink 做業的並行度,一個做業能夠分配到多個 TaskManager 上,而一個 TaskManager 也可能運行着多個做業。然而,一個 TaskManager 就是一個 JVM,當多個做業分配到一個 TaskManager 上時,就會有搶奪資源的狀況發生。
例如,我一個 TaskManager 分配了 3 個 Slot(3 個 CPU)和 8G 堆內存。當 JobManager 調度做業的時候,有可能將 3 個不一樣做業的線程調度到該 TaskManager 上,那麼這 3 個做業就會同時搶奪 CPU 和內存的資源。當其中一個做業特別耗 CPU 或內存的時候,就會影響其餘兩個做業。
在這種狀況下,咱們經過配置 Flink 能夠實現做業的隔離,如圖 7 所示:
圖7 Flink 做業隔離先後的調度圖
經過配置:
「taskmanager.numberOfTaskSlots: 1」:能夠設置每一個TaskManager只有一個Slot;
「cpu_period」和「cpu_quota」:能夠限定每一個TaskManager的CPU個數
「taskmanager.heap.mb」能夠配置每一個TaskManager的JVM的內存大小。
經過以上配置,能夠限定每一個 TaskManager 獨佔 CPU 和內存的資源,且不會多個做業搶佔,實現做業之間的隔離。
咱們運維 Flink 集羣的時候發現,出現最多的問題就是反壓。在 3.2 中提到過,發生反壓的緣由有不少種,但不管什麼緣由,數據最終都會被積壓在發生反壓上游的算子的本地緩衝區(localBuffer)中。
咱們知道,每個 TaskManager 有一個本地緩衝池, 每個算子數據進來後會把數據填充到本地緩衝池中,數據從這個算子出去後會回收這塊內存。當被反壓後,數據發不出去,本地緩衝池內存就沒法釋放,致使一直請求緩衝區(requestBuffer)。
因爲 Heartbeat 只能監控出是否發生了反壓,但沒法定位到是哪一個算子出了問題,所以咱們定時地將每一個算子的 StackTrace 打印出來,當發生反壓時,經過 StackTrace 就能夠知道是哪一個算子的瓶頸。
如圖8所示,咱們能夠清晰地看到發生反壓的 Flink 做業及其所在的 Taskmanager。再經過 Thread Dump,咱們就能夠定位到代碼的問題。
圖8 發生反壓的StackTrace (點擊觀看大圖)
Flink 自己提供了不少有用的指標[2]來監控 Flink 做業的運行狀況,在此基礎上咱們還加了一些業務上的指標。除此以外,咱們還使用瞭如下工具監控 Flink 做業。
Flink 的 History server[3]能夠查詢已完成做業的狀態和指標。好比一個做業的重啓次數、它運行的時間。咱們經常用它找出運行不正常的做業。好比,咱們能夠經過 History server 的 attempt 指標知道每一個做業重啓的次數,從而快速去現場找到重啓的緣由,避免下次再發生。
雖然 Flink 有 HA 的模式,但在極端狀況下,例如整個集羣出現問題時,須要 on-call 即時發覺並人工干預。咱們在元數據微服務中保存了最後一次提交做業成功的元數據,它記錄了在每一個 Flink 集羣上應該運行哪些做業。守護線程(Daemon thread)會每分鐘去比較這個元數據和 Flink 上運行的做業,若發現 JobManager 連不通或者有做業運行不一致則馬上發出告警(Alert)通知 on-call。
下面介紹幾個已經運行在監控系統上的 Flink 流處理系統的應用:
當前監控團隊是基於 Flink Streaming 作事件告警(Event alerting),咱們定義了一個告警算子 EventAlertingCapability,該 Capability 能夠處理每一個 Policy 自定義的規則。如圖 9 定義的一條性能監控規則:
該規則的含義是當性能檢測器的應用爲「r1rover」, 主機以「r1rover」開頭,且數值大於 90 時,就觸發告警。且生成的告警會發送到指定的 Kafka topic 中供下游繼續處理。
圖9 Single-Threshold1 Policy (點擊查看大圖)
Eventzon 就像 eBay 的事件中心,它收集了從各個應用,框架,基礎架構發過來的事件,最後經過監控團隊的 Flink Streaming 實時生成告警。因爲各個事件的數據源不一樣,它們的元數據也不一樣,所以沒法用一條統一的規則來描述它。
咱們專門定義了一套做業來處理 Eventzon 的事件,它包含了多個 Capability,好比 Filter Capability,用來過濾非法的或者不符合條件的事件; 又好比 Deduplicate Capability,能夠用來去除重複的事件。Eventzon 的全部事件通過一整套做業後,會生成有效的告警,並根據通知機制經過 E-mail、Slack 或 Pagerduty 發給相關團隊。
Netmon 的全稱爲 Network Monitoring, 即網絡監控,它能夠用來監控整個 eBay 網絡設備的健康狀態。它的數據源來自 eBay 的交換機,路由器等網絡設備的日誌。Netmon 的做用是根據這些日誌找出一些特定的信息,每每是一些錯誤的日誌,以此來生成告警。
eBay 的每一臺設備都要「登記造冊」,每臺設備將日誌發過來後,咱們經過 EnrichCapability 從「冊子」中查詢這臺設備的信息,並把相關信息好比 IP 地址,所在的數據中心,所在的機架等填充到日誌信息中做爲事件保存。當設備產生一些特定的錯誤日誌時, 它會被相應的規則匹配而後生成告警,該告警會被 EventProcess Capability 保存到 Elasticsearch 中實時顯示到 Netmon 的監控平臺(dashboard)上。有時由於網絡抖動致使一些短暫的錯誤發生,但系統過一下子就會自動恢復。
當上述狀況發生時,Netmon 會有相應的規則將發生在網絡抖動時生成的告警標記爲「已解決」(Resolved)。對於一些必須人工干預的告警,運維人員能夠經過網絡監控平臺(Netmon dashboard)手動點擊「已解決」,完成該告警的生命週期。
eBay 的監控團隊但願能根據用戶提供的指標、事件和日誌以及相應的告警規則實時告警用戶。Flink Streaming 可以提供低延時的處理從而可以達到咱們低延時的要求,而且它適合比較複雜的處理邏輯。
然而在運維 Flink 的過程當中,咱們也發現了因爲做業重啓等緣由致使誤報少報告警的狀況發生,從而誤導客戶。所以從此咱們會在 Flink 的穩定性和高可用性上投入更多。咱們也但願在監控指標、日誌上可以集成一些複雜的 AI 算法,從而可以生成更加有效精確的告警,成爲運維人員的一把利器。
參考文獻:
[1]https://ci.apache.org/project...
[2]https://ci.apache.org/project...
[3]https://ci.apache.org/project...
▼ Flink 社區推薦 ▼
史上超強陣容,Flink Forward Asia 2019 你報名了嗎?