《Apache Kafka實戰》做者,北航計算機碩士畢業,現任某互金公司計算平臺總監,曾就任於IBM、搜狗、微博等公司。國內活躍的Kafka代碼貢獻者。後端
前言緩存
雖然目前Apache Kafka已經全面進化成一個流處理平臺,但大多數的用戶依然使用的是其核心功能:消息隊列。對於如何有效地監控和調優Kafka是一個大話題,不少用戶都有這樣的困擾,今天咱們就來討論一下。服務器
1、Kafka綜述網絡
在討論具體的監控與調優以前,我想用一張PPT圖來簡單說明一下當前Kafka生態系統的各個組件。就像我前面所說,Kafka目前已經進化成了一個流處理平臺,除了核心的消息隊列組件Kafka core以外,社區還新引入Kafka Connect和Kafka Streams兩個新的組件:其中前者負責Kafka與外部系統的數據傳輸;後者則負責對數據進行實時流處理計算。下圖羅列了一些關鍵的Kafka概念。數據結構
2、Kafka監控負載均衡
我打算從五個維度來討論Kafka的監控。首先是要監控Kafka集羣所在的主機;第二是監控Kafka broker JVM的表現;第三點,咱們要監控Kafka Broker的性能;第四,咱們要監控Kafka客戶端的性能。這裏的所指的是廣義的客戶端——多是指咱們本身編寫的生產者、消費者,也有多是社區幫咱們提供的生產者、消費者,好比說Connect的Sink/Source或Streams等;最後咱們須要監控服務器之間的交互行爲。框架
1.主機監控運維
我的認爲對於主機的監控是最重要的。由於不少線上環境問題首先表現出來的症狀就是主機的某些性能出現了明顯的問題。此時一般是運維人員首先發現了它們而後告訴咱們這臺機器有什麼問題,對於Kafka主機監控一般是發現問題的第一步。這一頁列出了常見的指標,包括CPU、內存、帶寬等數據。須要注意的是CPU使用率的統計。可能你們聽過這樣的提法:個人Kafka Broker CPU使用率是400%,怎麼回事?對於這樣的問題,咱們首先要搞清楚這個使用率是怎麼觀測出來的? 不少人拿top命令中的vss或rss字段來表徵CPU使用率,但實際上它們並非真正的CPU使用率——那只是全部CPU共同做用於Kafka進程所花的時間片的比例。舉個例子,若是機器上有16個CPU,那麼只要這些值沒有超過或接近1600, 那麼你的CPU使用率其實是很低的。所以要正確理解這些命令中各個字段的含義。
這頁PPT右邊給出了一本書,若是你們想監控主機性能的話,我我的建議這本《SystemsPerformance》就足夠了。很是權威的一本書,推薦你們讀一下。
2.監控JVM
Kafka自己是一個普通的Java進程,因此任何適用於JVM監控的方法對於監控Kafka都是相通的。第一步就是要先了解Kafka應用。比方說了解Kafka broker JVM的GC頻率和延時都是多少,每次GC後存活對象的大小是怎樣的等。瞭解了這些信息咱們才能明確後面調優的方向。固然,咱們畢竟不是特別資深的JVM專家,所以也沒必要過多追求繁複的JVM監控與調優。只須要關注大的方面便可。另外,若是你們時間頗有限但又想快速掌握JVM監控與調優,推薦閱讀《Java Performance》。
3.Per-Broker監控
首先要確保Broker進程是啓動狀態?這聽起來好像有點搞笑,但個人確遇到過這樣的狀況。好比當把Kafka部署在Docker上時就容易出現進程啓動但服務沒有成功啓動的情形。正常啓動下,一個Kafka服務器起來的時候,應該有兩個端口,一個端口是9092常規端口,會建一個TCP連接。還有一個端口是給JMX監控用的。固然有多臺broker的話,那麼controller機器會爲每臺broker都維護一個TCP鏈接。在實際監控時能夠有意識地驗證這一點。
對於Broker的監控,咱們主要是經過JMS指標來作的。用過Kafka的人知道,Kafka社區提供了特別多的JMS指標,其中不少指標用處不大。我這裏列了一些比較重要的:首先是broker機器每秒出入的字節數,就是相似於我能夠監控網卡的流量,必定要把這個指標監控起來,並實時與你的網卡帶寬進行比較——若是發現該值很是接近於帶寬的話,就證實broker負載太高,要麼增長新的broker機器,要麼把該broker上的負載均衡到其餘機器上。
另外還有兩個線程池空閒使用率小關注,最好確保它們的值都不要低於30%,不然說明Broker已經很是的繁忙。 此時須要調整線程池線程數。
接下來是監控broker服務器的日誌。日誌中包含了很是豐富的信息。這裏所說的日誌不只是broker服務器的日誌,還包括Kafka controller的日誌。咱們須要常常性地查看日誌中是否出現了OOM錯誤抑或是時刻關注日誌中拋出的ERROR信息。
咱們還須要監控一些關鍵後臺線程的運行狀態。我的認爲有兩個比較重要的線程須要監控:一個Log Cleaner線程——該線程是執行數據壓實操做的,若是該線程出問題了,用戶一般沒法感知到,而後會發現全部compact策略的topic會愈來愈大直到佔滿全部磁盤空間;另外一個線程就是副本拉取線程,即follower broker使用該線程實時從leader處拉取數據。若是該線程「掛掉」了,用戶一般也是不知道的,但會發現follower再也不拉取數據了。所以咱們必定要按期地查看這兩個線程的狀態,若是發現它們意味終止,則去找日誌中尋找對應的報錯信息。
4.Clients監控
客戶端監控這塊,我這邊會分爲兩個,分別討論對生產者和消費者的監控。生產者往Kafka發消息,在監控以前咱們至少要了解一下客戶端機器與Broker端機器之間的RTT是多少。對於那種跨數據中心或者是異地的狀況來講,RTT原本就很大,若是不作特殊的調優,是不可能有過高的TPS的。目前Kafka producer是雙線程的設計機制,分爲用戶主線程和Sender線程,當這個Sender線程掛了的時候,前端用戶是不感知的,但表現爲producer發送消息失敗,因此用戶最好監控一下這個Sender線程的狀態。
還有就是監控PRODUCE請求的處理延時。一條消息從生產者端發送到Kafka broker進行處理,以後返回給producer的總時間。整個鏈路中各個環節的耗時最好要作到心中有數。由於不少狀況下,若是你要提高生產者的TPS,瞭解整個鏈路中的瓶頸後才能作到有的放矢。後面PPT中我會討論如何拆解這條鏈路。
如今說說消費者。這裏的消費者說的是新版本的消費者,也就是java consumer。
社區已經很是不推薦再繼續使用老版本的消費者了。新版本的消費者也是雙線程設計,後面有一個心跳線程,若是這個線程掛掉的話,前臺線程是不知情的。因此,用戶最好按期監控該心跳線程的存活狀況。心跳線程按期發心跳請求給Kafka服務器,告訴Kafka,這個消費者實例還活着,以免coordinator錯誤地認爲此實例已「死掉」從而開啓rebalance。Kafka提供了不少的JMX指標能夠用於監控消費者,最重要的消費進度滯後監控,也就是所謂的consumerlag。
假設producer生產了100條消息,消費者讀取了80條,那麼lag就是20。顯然落後的越少越好,這代表消費者很是及時,用戶也能夠用工具行命令來查lag,甚至寫Java的API來查。與lag對應的還有一個lead指標,它表徵的是消費者領先第一條消息的進度。好比最先的消費位移是1,若是消費者當前消費的消息是10,那麼lead就是9。對於lead而言越大越好,不然代表此消費者可能處於停頓狀態或者消費的很是慢,本質上lead和lag是一回事,之因此列出來是由於lead指標是我開發的,也算打個廣告吧。
除了以上這些,咱們還須要監控消費者組的分區分配狀況,避免出現某個實例被分配了過多的分區,致使負載嚴重不平衡的狀況出現。通常來講,若是組內全部消費者訂閱的是相同的主題,那麼一般不會出現明顯的分配傾斜。一旦各個實例訂閱的主題不相同且每一個主題分區數良莠不齊時就極易發生這種不平衡的狀況。Kafka目前提供了3種策略來幫助用戶完成分區分配,最新的策略是黏性分配策略,它能保證絕對的公平,你們能夠去試一下。
最後就是要監控rebalance的時間——目前來看,組內超多實例的rebalance性能不好,可能都是小時級別的。並且比較悲劇的是當前無較好的解決方案。因此,若是你的Consumer特別特別多的話,必定會有這個問題,你監控一下兩個步驟所用的時間,看看是否知足需求,若是不能知足的話,看看能不能把消費者去除,儘可能減小消費者數量。
5.Inter-Broker監控
最後一個維度就是監控Broker之間的表現,主要是指副本拉取。Follower副本實時拉取leader處的數據,咱們天然但願這個拉取過程越快越好。Kafka提供了一個特別重要的JMX指標,叫作備份不足的分區數,好比說我規定了這條消息,應該在三個Broker上面保存,假設只有一個或者兩個Broker上保存該消息,那麼這條消息所在的分區就被稱爲「備份不足」的分區。這種狀況是特別關注的,由於有可能形成數據的丟失。《Kafka權威指南》一書中是這樣說的:若是你只能監控一個Kafka JMX指標,那麼就監控這個好了,確保在你的Kafka集羣中該值是永遠是0。一旦出現大於0的情形趕忙處理。
還有一個比較重要的指標是表徵controller個數的。整個集羣中應該確保只能有一臺機器的指標是1,其餘全應該是0,若是你發現有一臺機器是2或者是3,必定是出現腦裂了,此時應該去檢查下是否出現了網絡分區。Kafka自己是不能對抗腦裂的,徹底依靠Zookeeper來作,可是若是真正出現網絡分區的話,也是沒有辦法處理的,不如趕快fail fast掉。
3、監控工具
當前沒有一款Kafka監控工具是公認比較優秀的,每一個都有本身的特色但也有些致命的缺陷。咱們針對一些常見的監控工具逐個討論下。
1.Kafka Manager
應該說在全部免費的監控框架中,Kafka Manager是最受歡迎的。它最先由雅虎開源,功能很是齊全,展現的數據很是豐富。另外,用戶可以在界面上執行一些簡單的集羣管理操做。更加使人欣慰的是,該框架目前還在不斷維護中,所以使用Kafka manager來監控Kafka是一個不錯的選擇。
2.Burrow
Burrow是去年下半年開源,專門監控消費者信息的框架。這個框架剛開始開源的時候,我還對它仍是寄予厚望的,畢竟是Kafka社區committer親自編寫的。不過Burrow的問題在於沒有UI界面,不方便運維操做。另外因爲是Go語言寫的,你要用的話,必須搭建Go語言環境,而後編譯部署,總之用起來不是很方便。還有就是它的更新不是很頻繁,已經有點半荒廢的狀態,你們不妨一試。
3.Kafka Monitor
嚴格來講,它不是監控工具,它是專門作Kafka集羣系統性測試用的。待監控的指標能夠由用戶本身設定,主要是作一些端到端的測試。好比說你搭了一套Kafka集羣,想測試端到端的性能怎樣:從發消息到消費者讀取消息這一總體流程的性能。該框架的優點也是由Kafka社區團隊寫的,質量有保障,但更新不是很頻繁,目前好像幾個月沒有更新了。
4.Kafka Offset Monitor
KafkaOffsetMonitor是我用的最先的一個Kafka監控工具,也是監控消費者位移,只不過那時候Kafka把位移保持在Zookeepr上。這個框架的界面很是漂亮,國內用的人不少。可是如今有一個問題,由於咱們如今用了新版本的消費者,這個框架目前支持得的並非特別好。並且還有一個問題就是它已經再也不維護了,可能有1-2年沒有任何更新了。
5.Kafka Eagle
這是國人本身開發的,我不知道具體是哪一個大牛開發的,可是在Kafka QQ羣裏面不少人推崇,由於界面很乾淨漂亮,上面有很好的數據展示。
6.Confluent Control Center
Control Center是目前我能收集到的功能最齊全的Kafka監控框架了,只不過只有購買了Confluent企業版也有的,也就是說是付費的。
綜合來說,若是你是Kafka集羣運維操做人員,推薦先用Kafka Manager來作監控,後面再根據實際監控需求定製化開發特有的工具或框架。
4、系統調優
Kafka監控的一個主要的目的就是調優Kafka集羣。這裏羅列了一些常見的操做系統級的調優。
首先是保證頁緩存的大小——至少要設置頁緩存爲一個日誌段的大小。咱們知道Kafka大量使用頁緩存,只要保證頁緩存足夠大,那麼消費者讀取消息時就有大機率保證它可以直接命中頁緩存中的數據而無需從底層磁盤中讀取。故只要保證頁緩存要知足一個日誌段的大小。
第二是調優文件打開數。不少人對這個資源有點畏手畏腳。實際上這是一個很廉價的資源,設置一個比較大的初始值一般都是沒有什麼問題的。
第三是調優vm.max_map_count參數。主要適用於Kafka broker上的主題數超多的狀況。Kafka日誌段的索引文件是用映射文件的機制來作的,故若是有超多日誌段的話,這種索引文件數必然是不少的,極易打爆這個資源限制,因此對於這種狀況通常要適當調大這個參數。
第四是swap的設置。不少文章說把這個值設爲0,就是徹底禁止swap,我我的不建議這樣,由於當你設置成爲0的時候,一旦你的內存耗盡了,Linux會自動開啓OOM killer而後隨機找一個進程殺掉。這並非咱們但願的處理結果。相反,我建議設置該值爲一個比較接近零的較小值,這樣當個人內存快要耗盡的時候會嘗試開啓一小部分swap,雖然會致使broker變得很是慢,但至少給了用戶發現問題並處理之的機會。
第五JVM堆大小。首先鑑於目前Kafka新版本已經不支持Java7了,而Java 8自己不更新了,甚至Java9其實都不作了,直接作Java10了,因此我建議Kafka至少搭配Java8來搭建。至於堆的大小,我的認爲6-10G足矣。若是出現了堆溢出,就提jira給社區,讓他們看究竟是怎樣的問題。由於這種狀況下即便用戶調大heap size,也只是延緩OOM而已,不太可能從根本上解決問題。
最後,建議使用專屬的多塊磁盤來搭建Kafka集羣。自1.1版本起Kafka正式支持JBOD,所以不必在底層再使用一套RAID了。
5、Kafka調優的四個層面
Kafka調優一般能夠從4個維度展開,分別是吞吐量、延遲、持久性和可用性。在具體展開這些方面以前,我想先建議用戶保證客戶端與服務器端版本一致。若是版本不一致,就會出現向下轉化的問題。舉個例子,服務器端保存高版本的消息,當低版本消費者請求數據時,服務器端就要作轉化,先把高版本消息轉成低版本再發送給消費者。這件事情自己就很是很是低效。不少文章都討論過Kafka速度快的緣由,其中就談到了零拷貝技術——即數據不須要在頁緩存和堆緩存中來回拷貝。
簡單來講producer把生產的消息放到頁緩存上,若是兩邊版本一致,能夠直接把此消息推給Consumer,或者Consumer直接拉取,這個過程是不須要把消息再放到堆緩存。可是你要作向下轉化或者版本不一致的話,就要額外把數據再堆上,而後再放回到Consumer上,速度特別慢。
1.Kafka調優 – 吞吐量
調優吞吐量就是咱們想用更短的時間作更多的事情。這裏列出了客戶端須要調整的參數。前面說過了producer是把消息放在緩存區,後端Sender線程從緩存區拿出來發到broker。這裏面涉及到一個打包的過程,它是批處理的操做,不是一條一條發送的。所以這個包的大小就和TPS息息相關。一般狀況下調大這個值都會讓TPS提高,可是也不會無限制的增長。不過調高此值的劣處在於消息延遲的增長。除了調整batch.size,設置壓縮也能夠提高TPS,它可以減小網絡傳輸IO。當前Lz4的壓縮效果是最好的,若是客戶端機器CPU資源很充足那麼建議開啓壓縮。
對於消費者端而言,調優TPS並無太好的辦法,可以想到的就是調整fetch.min.bytes。適當地增長該參數的值可以提高consumer端的TPS。對於Broker端而言,一般的瓶頸在於副本拉取消息時間過長,所以能夠適當地增長num.replica.fetcher值,利用多個線程同時拉取數據,能夠加快這一進程。
2.Kafka調優 – 延時
所謂的延時就是指消息被處理的時間。某些狀況下咱們天然是但願越快越好。針對這方面的調優,consumer端能作的很少,簡單保持fetch.min.bytes默認值便可,這樣能夠保證consumer可以當即返回讀取到的數據。講到這裏,可能有人會有這樣的疑問:TPS和延時不是一回事嗎?假設發一條消息延時是2ms,TPS天然就是500了,由於一秒只能發500消息,其實這二者關係並非簡單的。由於我發一條消息2毫秒,可是若是把消息緩存起來統一發,TPS會提高不少。假設發一條消息依然是2ms,可是我先等8毫秒,在這8毫秒以內可能能收集到一萬條消息,而後我再發。至關於你在10毫秒內發了一萬條消息,你們能夠算一下TPS是多少。事實上,Kafka producer在設計上就是這樣的實現原理。
3.Kafka調優 –消息持久性
消息持久化本質上就是消息不丟失。Kafka對消息不丟失的承諾是有條件的。之前碰到不少人說我給Kafka發消息,發送失敗,消息丟失了,怎麼辦?嚴格來講Kafka不認爲這種狀況屬於消息丟失,由於此時消息沒有放到Kafka裏面。Kafka只對已經提交的消息作有條件的不丟失保障。
若是要調優持久性,對於producer而言,首先要設置重試以防止由於網絡出現瞬時抖動形成消息發送失敗。一旦開啓了重試,還須要防止亂序的問題。好比說我發送消息1與2,消息2發送成功,消息1發送失敗重試,這樣消息1就在消息2以後進入Kafka,也就是形成亂序了。若是用戶不容許出現這樣的狀況,那麼還須要顯式地設置max.in.flight.requests.per.connection爲1。
本頁PPT列出的其餘參數都是很常規的參數,好比unclean.leader.election.enable參數,最好仍是將其設置成false,即不容許「髒」副本被選舉爲leader。
4.Kafka調優 –可用性
最後是可用性,與剛纔的持久性是相反的,我容許消息丟失,只要保證系統高可用性便可。所以我須要把consumer心跳超時設置爲一個比較小的值,若是給定時間內消費者沒有處理完消息,該實例可能就被踢出消費者組。我想要其餘消費者更快地知道這個決定,所以調小這個參數的值。
6、定位性能瓶頸
下面就是性能瓶頸,嚴格來講這不是調優,這是解決性能問題。對於生產者來講,若是要定位發送消息的瓶頸很慢,咱們須要拆解發送過程當中的各個步驟。就像這張圖表示的那樣,消息的發送共有6步。第一步就是生產者把消息放到Broker,第2、三步就是Broker把消息拿到以後,寫到本地磁盤上,第四步是follower broker從Leader拉取消息,第五步是建立response;第六步是發送回去,告訴我已經處理完了。
這六步當中你須要肯定瓶頸在哪?怎麼肯定?——經過不一樣的JMX指標。好比說步驟1是慢的,可能你常常碰到超時,你若是在日誌裏面常常碰到request timeout,就表示1是很慢的,此時要適當增長超時的時間。若是二、3慢的狀況下,則可能體如今磁盤IO很是高,致使往磁盤上寫數據很是慢。假若是步驟4慢的話,查看名爲remote-time的JMX指標,此時能夠增長fetcher線程的數量。若是5慢的話,表現爲response在隊列致使待的時間過長,這時能夠增長網絡線程池的大小。6與1是同樣的,若是你發現一、6常常出問題的話,查一下你的網絡。因此,就這樣來分解整個的耗時。這是到底哪一步的瓶頸在哪,須要看看什麼樣的指標,作怎樣的調優。
7、Java Consumer的調優
最後說一下Consumer的調優。目前消費者有兩種使用方式,一種是同一個線程裏面就直接處理,另外一種是我採用單獨的線程,consumer線程只是作獲取消息,消息真正的處理邏輯放到單獨的線程池中作。這兩種方式有不一樣的使用場景:第一種方法實現較簡單,由於你的消息處理邏輯直接寫在一個線程裏面就能夠了,可是它的缺陷在於TPS可能不會很高,特別是當你的客戶端的機器很是強的時候,你用單線程處理的時候是很慢的,由於你沒有充分利用線程上的CPU資源。第二種方法的優點是可以充分利用底層服務器的硬件資源,TPS能夠作的很高,可是處理提交位移將會很難。
最後說一下參數,也是網上問的最多的,這幾個參數究竟是作什麼的。第一個參數,就是控制consumer單次處理消息的最大時間。好比說設定的是600s,那麼consumer給你10分鐘來處理。若是10分鐘內consumer沒法處理完成,那麼coordinator就會認爲此consumer已死,從而開啓rebalance。
Coordinator是用來管理消費者組的協調者,協調者如何在有效的時間內,把消費者實例掛掉的消息傳遞給其餘消費者,就靠心跳請求,所以能夠設置heartbeat.interval.ms爲一個較小的值,好比5s。
8、Q & A
Q1:胡老師在前面提到低版本與高版本有一個端口的問題,我想問一下高版本的、低版本的會有這個問題嗎?
A1:會有。
Q2:兩種模式,一個是Consumer怎麼作到全部的partition,在裏面作管理的。會有一個問題,某個Consumer的消費比較慢,由於全部的Partition的消費都是綁定在一個線程。一個消費比較慢,一個消費比較快,要等另外一個。有沒有一種方案,消費者比較慢的能夠暫定,若是涉及到暫停的話,頻繁的暫定耗費的時間,是否是會比較慢?
A2:一個線程處理全部的分區。若是從開銷來說並不大,可是的確會出現像你說的,若是一個消費者定了100個分區,目前我這邊看到的效果,某段時間內有可能會形成某些分區的餓死,好比說某些分區長期得不到數據,可能有一些分區不停的有數據,這種狀況下的確有可能狀況。可是你說的兩種方法自己開銷不是很大,由於它就是內存當中的結構變動,就是定位信息,若是segment,就把定位信息先暫時關掉,不涉及到很複雜的數據結構的變動。
Q3:怎麼決定順序呢?
A3:這個事情如今在Broker端作的,簡單會作輪詢,好比說有100個分區,第一批隨機給你一批分區,以後這些分區會排到整個隊列的末尾,從其餘的分區開始給你,作到儘可能的公平。
Q4:消費的時候會出現數據傾斜的狀況,這塊如何理解?
A4:數據傾斜。這種狀況下發生在每一個消費者訂閱信息不同的狀況下,特別容易出現數據傾斜。好比說我訂閱主題123,我訂閱主題456,咱們又在同一個組裏面這些主題分區數極不相同,頗有可能出現我訂閱了10個分區,你可能訂閱2個分區。若是你用的是有粘性的分配策略,那種保證不會出現超過兩個以上相差的狀況。這個策略推出的時間也不算短了,是0.11版本推出來的。