Kafka 高性能吞吐揭祕

A high-throughput distributed messaging system.
--Apache Kafkajava

Kafka做爲時下最流行的開源消息系統,被普遍地應用在數據緩衝、異步通訊、聚集日誌、系統解耦等方面。相比較於RocketMQ等其餘常見消息系統,Kafka在保障了大部分功能特性的同時,還提供了超一流的讀寫性能。算法

本文將針對Kafka性能方面進行簡單分析,首先簡單介紹一下Kafka的架構和涉及到的名詞:apache

  1. Topic:用於劃分Message的邏輯概念,一個Topic能夠分佈在多個Broker上。緩存

  2. Partition:是Kafka中橫向擴展和一切並行化的基礎,每一個Topic都至少被切分爲1個Partition。安全

  3. Offset:消息在Partition中的編號,編號順序不跨Partition。網絡

  4. Consumer:用於從Broker中取出/消費Message。架構

  5. Producer:用於往Broker中發送/生產Message。併發

  6. Replication:Kafka支持以Partition爲單位對Message進行冗餘備份,每一個Partition均可以配置至少1個Replication(當僅1個Replication時即僅該Partition自己)。app

  7. Leader:每一個Replication集合中的Partition都會選出一個惟一的Leader,全部的讀寫請求都由Leader處理。其餘Replicas從Leader處把數據更新同步到本地,過程相似你們熟悉的MySQL中的Binlog同步。dom

  8. Broker:Kafka中使用Broker來接受Producer和Consumer的請求,並把Message持久化到本地磁盤。每一個Cluster當中會選舉出一個Broker來擔任Controller,負責處理Partition的Leader選舉,協調Partition遷移等工做。

  9. ISR(In-Sync Replica):是Replicas的一個子集,表示目前Alive且與Leader可以「Catch-up」的Replicas集合。因爲讀寫都是首先落到Leader上,因此通常來講經過同步機制從Leader上拉取數據的Replica都會和Leader有一些延遲(包括了延遲時間和延遲條數兩個維度),任意一個超過閾值都會把該Replica踢出ISR。每一個Partition都有它本身獨立的ISR。

以上幾乎是咱們在使用Kafka的過程當中可能遇到的全部名詞,同時也無一不是最核心的概念或組件,感受到從設計自己來講,Kafka仍是足夠簡潔的。此次本文圍繞Kafka優異的吞吐性能,逐個介紹一下其設計與實現當中所使用的各項「黑科技」。

Broker
不一樣於Redis和MemcacheQ等內存消息隊列,Kafka的設計是把全部的Message都要寫入速度低容量大的硬盤,以此來換取更強的存儲能力。實際上,Kafka使用硬盤並無帶來過多的性能損失,「規規矩矩」的抄了一條「近道」。

首先,說「規規矩矩」是由於Kafka在磁盤上只作Sequence I/O,因爲消息系統讀寫的特殊性,這並不存在什麼問題。關於磁盤I/O的性能,引用一組Kafka官方給出的測試數據(Raid-5,7200rpm):

Sequence I/O: 600MB/s
Random I/O: 100KB/s

因此經過只作Sequence I/O的限制,規避了磁盤訪問速度低下對性能可能形成的影響。

接下來咱們再聊一聊Kafka是如何「抄近道的」。

首先,Kafka重度依賴底層操做系統提供的PageCache功能。當上層有寫操做時,操做系統只是將數據寫入PageCache,同時標記Page屬性爲Dirty。當讀操做發生時,先從PageCache中查找,若是發生缺頁才進行磁盤調度,最終返回須要的數據。實際上PageCache是把儘量多的空閒內存都當作了磁盤緩存來使用。同時若是有其餘進程申請內存,回收PageCache的代價又很小,因此現代的OS都支持PageCache。

使用PageCache功能同時能夠避免在JVM內部緩存數據,JVM爲咱們提供了強大的GC能力,同時也引入了一些問題不適用與Kafka的設計。
• 若是在Heap內管理緩存,JVM的GC線程會頻繁掃描Heap空間,帶來沒必要要的開銷。若是Heap過大,執行一次Full GC對系統的可用性來講將是極大的挑戰。
• 全部在在JVM內的對象都難免帶有一個Object Overhead(千萬不可小視),內存的有效空間利用率會所以下降。
• 全部的In-Process Cache在OS中都有一份一樣的PageCache。因此經過將緩存只放在PageCache,能夠至少讓可用緩存空間翻倍。
• 若是Kafka重啓,全部的In-Process Cache都會失效,而OS管理的PageCache依然能夠繼續使用。

PageCache還只是第一步,Kafka爲了進一步的優化性能還採用了Sendfile技術。在解釋Sendfile以前,首先介紹一下傳統的網絡I/O操做流程,大致上分爲如下4步。

  1. OS 從硬盤把數據讀到內核區的PageCache。

  2. 用戶進程把數據從內核區Copy到用戶區。

  3. 而後用戶進程再把數據寫入到Socket,數據流入內核區的Socket Buffer上。

  4. OS 再把數據從Buffer中Copy到網卡的Buffer上,這樣完成一次發送。
    圖片描述

整個過程共經歷兩次Context Switch,四次System Call。同一份數據在內核Buffer與用戶Buffer之間重複拷貝,效率低下。其中二、3兩步沒有必要,徹底能夠直接在內核區完成數據拷貝。這也正是Sendfile所解決的問題,通過Sendfile優化後,整個I/O過程就變成了下面這個樣子。
圖片描述
經過以上的介紹不難看出,Kafka的設計初衷是盡一切努力在內存中完成數據交換,不管是對外做爲一整個消息系統,或是內部同底層操做系統的交互。若是Producer和Consumer之間生產和消費進度上配合得當,徹底能夠實現數據交換零I/O。這也就是我爲何說Kafka使用「硬盤」並無帶來過多性能損失的緣由。下面是我在生產環境中採到的一些指標。
(20 Brokers, 75 Partitions per Broker, 110k msg/s)
圖片描述
此時的集羣只有寫,沒有讀操做。10M/s左右的Send的流量是Partition之間進行Replicate而產生的。從recv和writ的速率比較能夠看出,寫盤是使用Asynchronous+Batch的方式,底層OS可能還會進行磁盤寫順序優化。而在有Read Request進來的時候分爲兩種狀況,第一種是內存中完成數據交換。
圖片描述
Send流量從平均10M/s增長到了到平均60M/s,而磁盤Read只有不超過50KB/s。PageCache下降磁盤I/O效果很是明顯。

接下來是讀一些收到了一段時間,已經從內存中被換出刷寫到磁盤上的老數據。
圖片描述
其餘指標仍是老樣子,而磁盤Read已經飈高到40+MB/s。此時所有的數據都已是走硬盤了(對硬盤的順序讀取OS層會進行Prefill PageCache的優化)。依然沒有任何性能問題。

Tips

  1. Kafka官方並不建議經過Broker端的log.flush.interval.messages和log.flush.interval.ms來強制寫盤,認爲數據的可靠性應該經過Replica來保證,而強制Flush數據到磁盤會對總體性能產生影響。

  2. 能夠經過調整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio來調優性能。

  3. 髒頁率超過第一個指標會啓動pdflush開始Flush Dirty PageCache。

  4. 髒頁率超過第二個指標會阻塞全部的寫操做來進行Flush。

  5. 根據不一樣的業務需求能夠適當的下降dirty_background_ratio和提升dirty_ratio。

Partition
Partition是Kafka能夠很好的橫向擴展和提供高併發處理以及實現Replication的基礎。

擴展性方面。首先,Kafka容許Partition在集羣內的Broker之間任意移動,以此來均衡可能存在的數據傾斜問題。其次,Partition支持自定義的分區算法,例如能夠將同一個Key的全部消息都路由到同一個Partition上去。 同時Leader也能夠在In-Sync的Replica中遷移。因爲針對某一個Partition的全部讀寫請求都是隻由Leader來處理,因此Kafka會盡可能把Leader均勻的分散到集羣的各個節點上,以避免形成網絡流量過於集中。

併發方面。任意Partition在某一個時刻只能被一個Consumer Group內的一個Consumer消費(反過來一個Consumer則能夠同時消費多個Partition),Kafka很是簡潔的Offset機制最小化了Broker和Consumer之間的交互,這使Kafka並不會像同類其餘消息隊列同樣,隨着下游Consumer數目的增長而成比例的下降性能。此外,若是多個Consumer恰巧都是消費時間序上很相近的數據,能夠達到很高的PageCache命中率,於是Kafka能夠很是高效的支持高併發讀操做,實踐中基本能夠達到單機網卡上限。

不過,Partition的數量並非越多越好,Partition的數量越多,平均到每個Broker上的數量也就越多。考慮到Broker宕機(Network Failure, Full GC)的狀況下,須要由Controller來爲全部宕機的Broker上的全部Partition從新選舉Leader,假設每一個Partition的選舉消耗10ms,若是Broker上有500個Partition,那麼在進行選舉的5s的時間裏,對上述Partition的讀寫操做都會觸發LeaderNotAvailableException。

再進一步,若是掛掉的Broker是整個集羣的Controller,那麼首先要進行的是從新任命一個Broker做爲Controller。新任命的Controller要從Zookeeper上獲取全部Partition的Meta信息,獲取每一個信息大概3-5ms,那麼若是有10000個Partition這個時間就會達到30s-50s。並且不要忘記這只是從新啓動一個Controller花費的時間,在這基礎上還要再加上前面說的選舉Leader的時間 -_-!!!!!!

此外,在Broker端,對Producer和Consumer都使用了Buffer機制。其中Buffer的大小是統一配置的,數量則與Partition個數相同。若是Partition個數過多,會致使Producer和Consumer的Buffer內存佔用過大。

Tips

  1. Partition的數量儘可能提早預分配,雖然能夠在後期動態增長Partition,可是會冒着可能破壞Message Key和Partition之間對應關係的風險。

  2. Replica的數量不要過多,若是條件容許儘可能把Replica集合內的Partition分別調整到不一樣的Rack。

  3. 盡一切努力保證每次停Broker時均可以Clean Shutdown,不然問題就不只僅是恢復服務所需時間長,還可能出現數據損壞或其餘很詭異的問題。

Producer
Kafka的研發團隊表示在0.8版本里用Java重寫了整個Producer,聽說性能有了很大提高。我尚未親自對比試用過,這裏就不作數據對比了。本文結尾的擴展閱讀裏提到了一套我認爲比較好的對照組,有興趣的同窗能夠嘗試一下。

其實在Producer端的優化大部分消息系統採起的方式都比較單一,無非也就化零爲整、同步變異步這麼幾種。

Kafka系統默認支持MessageSet,把多條Message自動地打成一個Group後發送出去,均攤後拉低了每次通訊的RTT。並且在組織MessageSet的同時,還能夠把數據從新排序,從爆發流式的隨機寫入優化成較爲平穩的線性寫入。

此外,還要着重介紹的一點是,Producer支持End-to-End的壓縮。數據在本地壓縮後放到網絡上傳輸,在Broker通常不解壓(除非指定要Deep-Iteration),直至消息被Consume以後在客戶端解壓。

固然用戶也能夠選擇本身在應用層上作壓縮和解壓的工做(畢竟Kafka目前支持的壓縮算法有限,只有GZIP和Snappy),不過這樣作反而會意外的下降效率!!!! Kafka的End-to-End壓縮與MessageSet配合在一塊兒工做效果最佳,上面的作法直接割裂了二者間聯繫。至於道理其實很簡單,壓縮算法中一條基本的原理「重複的數據量越多,壓縮比越高」。無關於消息體的內容,無關於消息體的數量,大多數狀況下輸入數據量大一些會取得更好的壓縮比。

不過Kafka採用MessageSet也致使在可用性上必定程度的妥協。每次發送數據時,Producer都是send()以後就認爲已經發送出去了,但其實大多數狀況下消息還在內存的MessageSet當中,還沒有發送到網絡,這時候若是Producer掛掉,那就會出現丟數據的狀況。

爲了解決這個問題,Kafka在0.8版本的設計借鑑了網絡當中的ack機制。若是對性能要求較高,又能在必定程度上容許Message的丟失,那就能夠設置request.required.acks=0 來關閉ack,以全速發送。若是須要對發送的消息進行確認,就須要設置request.required.acks爲1或-1,那麼1和-1又有什麼區別呢?這裏又要提到前面聊的有關Replica數量問題。若是配置爲1,表示消息只須要被Leader接收並確認便可,其餘的Replica能夠進行異步拉取無需當即進行確認,在保證可靠性的同時又不會把效率拉得很低。若是設置爲-1,表示消息要Commit到該Partition的ISR集合中的全部Replica後,才能夠返回ack,消息的發送會更安全,而整個過程的延遲會隨着Replica的數量正比增加,這裏就須要根據不一樣的需求作相應的優化。

Tips

  1. Producer的線程不要配置過多,尤爲是在Mirror或者Migration中使用的時候,會加重目標集羣Partition消息亂序的狀況(若是你的應用場景對消息順序很敏感的話)。

  2. 0.8版本的request.required.acks默認是0(同0.7)。

Consumer
Consumer端的設計大致上還算是比較常規的。

• 經過Consumer Group,能夠支持生產者消費者和隊列訪問兩種模式。
• Consumer API分爲High level和Low level兩種。前一種重度依賴Zookeeper,因此性能差一些且不自由,可是超省心。第二種不依賴Zookeeper服務,不管從自由度和性能上都有更好的表現,可是全部的異常(Leader遷移、Offset越界、Broker宕機等)和Offset的維護都須要自行處理。
• 你們能夠關注下不日發佈的0.9 Release。開發人員又用Java重寫了一套Consumer。把兩套API合併在一塊兒,同時去掉了對Zookeeper的依賴。聽說性能有大幅度提高哦~~

Tips
強烈推薦使用Low level API,雖然繁瑣一些,可是目前只有這個API能夠對Error數據進行自定義處理,尤爲是處理Broker異常或因爲Unclean Shutdown致使的Corrupted Data時,不然沒法Skip只能等着「壞消息」在Broker上被Rotate掉,在此期間該Replica將會一直處於不可用狀態。

擴展閱讀
Sendfile: https://www.ibm.com/developerworks/cn/java/j-zerocopy/
So what’s wrong with 1975 programming: https://www.varnish-cache.org/trac/wiki/ArchitectNotes
Benchmarking: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

相關文章
相關標籤/搜索