做者:張光輝前端
本文將爲你們展現字節跳動公司怎麼把Storm從Jstorm遷移到Flink的整個過程以及後續的計劃。你能夠藉此瞭解字節跳動公司引入Flink的背景以及Flink集羣的構建過程。字節跳動公司是如何兼容之前的Jstorm做業以及基於Flink作一個任務管理平臺的呢?本文將一一爲你揭開這些神祕的面紗。java
本文內容以下:node
下面這幅圖展現的是字節跳動公司的業務場景python
首先,應用層有廣告,也有AB測,也有推送和數據倉庫的一些業務。而後在使用J storm的過程當中,增長了一層模板主要應用於storm的計算模型,使用的語言是python。因此說中間相對抽象了一個schema,跑在最下面一層J storm計算引擎的上面。網絡
字節跳動公司有不少J-storm集羣,在當時17年7月份的時候,也就是在計劃遷移到Flink以前,J storm集羣的規模大概是下圖所示的規模級別,當時已經有5000臺機器左右了。併發
接下來,介紹下遷移Flink的整個過程。先詳細地介紹一下當時J-Storm是怎麼用的。運維
上面是一個word count的例子:左邊是一個目錄結構,這個目錄結構在resources下面,裏面的Spout/Bolt的邏輯都是一些python腳本寫的。而後在最外層還有一個topology_online.yaml配置文件。
這個配置文件是用來幹什麼的?就是把全部的Spout和Bolt串聯起來構成一個有向無關圖,也就是DAG圖。這就是使用J storm時的整個目錄結構,大部分用戶都是這樣用的。右邊是Spout和Bolt的邏輯,實際上是抽象出來了一個函數,就在這裏面寫業務方面的函數,而後將tuple_batch也就是上游流下來的數據去作一些計算邏輯。maven
下面詳細介紹一下配置文件的信息,其實咱們有整個拓撲結構拓撲的信息,好比說做業名叫什麼,做業須要多少資源,須要多少work數。這裏面會有單個的spout和Bolt的配置信息,好比是消費的topic仍是一些併發度?分佈式
除了這些信息還有整個這個數據流的流轉,好比說spout的輸出,輸出messsage的消息等等。最後還有整個的Spout到Bolt之間的shuffle邏輯。這就是咱們以前Jstorm的整個使用方式。最後會把整個目錄結構裏面的內容去解析出來,根據配置文件把整個storm的拓撲結構構建出來,而後提交到集羣上面去跑。函數
使用Jstorm集羣遇到了什麼問題呢?第一個問題,由於咱們當時是用使用python寫的代碼,整個集羣是沒有內存隔離的,job和work之間是沒有內存限制的。好比說在實際過程當中會常常遇到一個用戶,他可能代碼寫的有問題致使一個work可能佔了70G內存,把機器的內存佔了1/3。第二個問題就是說業務團隊之間沒有擴大管理,預算和審覈是無頭緒的。咱們當時都是都是跑在一個大集羣上面,而後個別業務是單獨跑在一些小集羣,可是咱們每次都是資源不足,也沒辦法梳理這個預算。
第三個問題就是集羣過多,運維平臺化作得不太好,都是靠人來運維的。這個時候集羣多了基本上是管不過來的。
第四個問題就是說咱們用python寫的代碼,有些性能比較差。可是咱們在Storm的基礎上面去推廣這個Java也比較難,由於咱們部分同事其實是不承認Java的,由於他以爲java開發速度太慢了。
咱們當時想解決上面的問題,一個思路是把Jstorm放在yarn上面,直接把Jstorm在yarn上面兼容作這一套。後來由於知道阿里在用Flink因此去調研Flink,發現了Flink的一些優點,因此想嘗試用Flink解決存在的問題。
使用Flink首先第一個問題能夠成功解決,由於Flink做業是跑在yarn上面的,這就解決了內存隔離的問題。而後Yarn也是支持隊列的,咱們能夠根據業務去劃分隊列,這樣咱們的擴大預算審覈的問題獲得解決了。咱們也不須要本身運維一個集羣了,由於有yarn去管理咱們的資源,這樣也節省了運維成員。在此基礎上還能夠作一些物理隔離隊列,其實物理隔離隊列如今也遇到了問題。由於物理隔離隊列只是說這個機器隔離了,可是至關因而機櫃也沒有隔離網絡帶寬也沒有隔離,因此說即便是物理隔離隊列,如今也遇到好比說和離線做業共用機櫃的時候,這個機櫃的出口帶寬被打滿的問題。針對這些問題,咱們後續可能想在這個離線離線集羣上面作QOS這種流量級別的方式來解決這個問題。
Flink其實是能夠兼容Storm的,好比說以前的歷史做業是能夠遷移過來的,不須要維護兩套計算引擎。Flink支持一些高優先級的API好比說支持SQL以及窗口等特性包括說checkpoint。咱們頭條的業務對exactly-once的需求不是特別的強烈。
以上就是Flink的優點,因而咱們就決定從J storm往Flink去遷移。
在遷移的過程當中,第一件事情是先把Flink集羣創建起來。一開始確定都是追求穩定性,好比說把離線的yarn集羣隔離開,而後不依賴於HDFS也能夠把Hdfs線上的name node, name space隔離出來。而後咱們梳理了原來storm上面的做業,哪些做業屬於不一樣的業務,而後映射到不一樣的隊列裏面去,最後把一些特殊的隊列也隔離開來。這是咱們準備這個Fink集羣的時候考慮的幾個點。
下面就考慮Flink怎麼兼容J storm,而後把它遷移過來。
咱們當時Flink用的是1.32版本,由於Flink有Flink-storm這個工程,它能把Storm做業轉化成Flink做業,咱們就借鑑這些技術上實現了一個Flink –jstorm。至關於把一個J storm的拓撲結構轉化成了一個Flink job。只作完這件事情是不夠的,由於咱們有一系列的外圍工具須要去對齊。好比說以前提交做業的時候是經過一個腳本提交的讓用戶去屏蔽一些其餘的參數。使用 flink的話咱們一樣也是須要構建這麼一個腳本,而後去提交Flink Job,最後中止flink Job。第三點是構建flink job外圍工具,自動註冊報警,好比說消費延遲報警,自動註冊這個Dashboard以及一些log service,全部的這些爲外圍工具都要和原來的服務去對齊。
對齊完以後,咱們須要構建一個遷移腳本,遷移的過程當中最困難的是資源配置這一塊。由於原來Storm用了多少資源,Storm怎麼配,這對於遷移的用戶來講,若是是第一次作確定是不瞭解這些東西。所以咱們寫這麼一個腳本,幫用戶生成它Flink集羣裏面對應的資源使用狀況。這些工做作完了以後,咱們就開始去遷移。到如今爲止,總體遷移完了,還剩下十個左右的做業沒有遷移完。如今集羣規模達到了大概是6000多臺。
在遷移的過程當中咱們有一些其餘優化,好比說J storm是可以支持task和work維度的重啓的,Flink這一塊作得不是特別好。咱們在這方面作了一些優化實現了一個single task和single tm粒度的重啓,這樣就解決部分做業由於task重啓致使整個做業所有重啓。
遷移完以後,咱們又構建了一個流式管理平臺。這個平臺是爲了解決實際過程當中遇到了一些問題,好比說整個機羣掛了沒法肯定哪些做業在上面跑着,也通知不到具體的用戶,有些用戶做業都不知道本身提交了哪些做業。咱們構建流式做業的時候目標實際上就是和其餘的管理平臺是同樣的,好比說咱們提供一些界面操做,而後提供一個版本管理,就是爲了方便方便用戶升級和回滾的操做,咱們還提供了一站式的查問題的工具:把一些用戶須要的信息都聚合在一個頁面上面,防止用戶不斷跳來跳去以及避免不一樣系統之間的切換。有一些歷史記錄以前不論是跑在yarn上面仍是跑到storm上面,我一個做業被別人kill到了,其實我都是不知道的。針對這個問題咱們提供了一些歷史操做記錄的一些目標。
設計這個管理平臺的時候,咱們考慮到提供這麼一個前端管理平臺可能只是針對公司內部的一部分產品,其餘的產品也作了本身的一套前端。他們能夠用一個模板,根據本身的邏輯去生成一個storm任務。基於此,咱們把整個管理平臺抽象了兩層:最上一層實際上至關於一個面向用戶或者說是相似於前端的一個產品。中間這一層其實是一個相似於提交做業調度任務,這一層只負責提任務,而後停任務,管理生命週期以及由於故障致使做業失敗了,將做業從新拉起來。這是中間層TSS層作的事情。
這樣,咱們就能夠對接到全部的前端平臺。經過一個RPC進行TSS通訊,就把全部的底層的服務和Filnk和Yarn還有HDFS這些交互的底層的邏輯徹底屏蔽開來了。
接下來,用戶寫一個做業就比較簡單了,流程以下:
第一步用戶先要生成本身的一個做業模板,咱們這邊經過maven提供的腳本架去生成一些做業的schema,這個做業執行完以後,它會把幫你把一些porm文件,還有一些相似於kafkasource這種常規的組件都幫你準備好,而後你直接在這個模板裏面填本身的主要邏輯就能夠了。由於咱們寫Java程序遇到最多的一個問題就是包衝突問題。因此porm文件幫助用戶把一些可能衝突的一些jar包都給以exclude掉,這樣包衝突的機率會愈來愈小。
咱們測試做業基本上是用IDEA或者local模式去測試,也提供了一個腳本去提交做業,經過這個腳本提交到stage環境上面。在提交註冊在平臺上面去註冊這個做業,而後添加一些配置信息。
下面是一個代碼版本管理的界面:
把整個做業提交以後以下圖所示:
提交完一個做業以後,用戶可能想看做業運行的狀態怎麼樣,咱們經過四種方式去給用戶展現他的做業運行狀態的。
第一個是Flink UI,也就是官方自帶的UI用戶能夠去看。第二個是Dashboard,咱們展現了做業裏面的task維度,QPS以及task之間的網絡buffer,這些重要的信息匯聚到一塊兒建立了一個Dashboard,這樣可能查問題的時候方便一些。第三個是錯誤日誌,其實和你們的思路同樣,把一個分佈式的日誌而後聚合起來,而後寫到ES上面去。第四是作了一個Jobtrace的工具,就是咱們把Flink裏面常見的一些異常匹配出來,而後直接給用戶一個wiki的使用指南,告訴用戶好比說你的做業OM了須要擴大內存。只要用戶的做業出現了某些問題,咱們把已知的全部的異常都會匹配給用戶。
下面是ES的kibana:
這是咱們Jobtrace的功能,咱們把Flink的這些常見的異常都匹配出來,每個異常其實對應了一個wiki而後去讓用戶去解決本身的問題。
最後分享下咱們的近期規劃,前面的基本作完而且趨於穩定了,可是如今又遇到了一些新的問題。好比資源使用率這個問題,由於用戶提交做業的時候,用戶對資源不是特別敏感就隨意把一個資源提上去了,可能他明明須要兩個CPU,可是他提了四個CPU。咱們想經過一個工具可以監控到他須要多少資源,而後通知yarn去把這個資源給重置了。就是動態調整job資源,自動把資源重置。
第二個問題是優化做業重啓速度。咱們這邊好多業務是根據流式計算的指標來監控它業務的穩定性,若是最上游重啓一個做業,底下一羣人收到報警說線上出現一些問題了。緣由是最上游某一個做業再重啓。咱們想把重啓時間間隔去作到最短或者是無縫重啓,這是下一階段須要去探索探索的一個問題。
第四點:Flink SQL也剛上線,可能須要一些精力投入去推廣。
最後一點,咱們但願在此抽象出更多的模式做業模型來,由於咱們自己是有一些好比說kafka2ES,kafka2hdfs這些需求,能不能把他們抽象成一個schema,而後去對外提供一些服務。
以上就是我本次分享的主要內容,感謝Flink的舉辦者和參與者,感謝咱們的同事,由於以上的分享內容是我和咱們同事一塊兒作的。
更多資訊請訪問 Apache Flink 中文社區網站