kafka數據可靠性深度解讀

1 概述

Kakfa起初是由LinkedIn公司開發的一個分佈式的消息系統,後成爲Apache的一部分,它使用Scala編寫,以可水平擴展和高吞吐率而被普遍使用。目前愈來愈多的開源分佈式處理系統如Cloudera、Apache Storm、Spark等都支持與Kafka集成。前端

Kafka憑藉着自身的優點,愈來愈受到互聯網企業的青睞,惟品會也採用Kafka做爲其內部核心消息引擎之一。Kafka做爲一個商業級消息中間件,消息可靠性的重要性可想而知。如何確保消息的精確傳輸?如何確保消息的準確存儲?如何確保消息的正確消費?這些都是須要考慮的問題。本文首先從Kafka的架構着手,先了解下Kafka的基本原理,而後經過對kakfa的存儲機制、複製原理、同步原理、可靠性和持久性保證等等一步步對其可靠性進行分析,最後經過benchmark來加強對Kafka高可靠性的認知。java


2 Kafka體系架構

這裏寫圖片描述

如上圖所示,一個典型的Kafka體系架構包括若干Producer(能夠是服務器日誌,業務數據,頁面前端產生的page view等等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干Consumer (Group),以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在consumer group發生變化時進行rebalance。Producer使用push(推)模式將消息發佈到broker,Consumer使用pull(拉)模式從broker訂閱並消費消息。web

名詞解釋:算法

名稱 解釋
Broker 消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker能夠組成一個Kafka集羣
Topic Kafka根據topic對消息進行歸類,發佈到Kafka集羣的每條消息都須要指定一個topic
Producer 消息生產者,向Broker發送消息的客戶端
Consumer 消息消費者,從Broker讀取消息的客戶端
ConsumerGroup 每一個Consumer屬於一個特定的Consumer Group,一條消息能夠發送到多個不一樣的Consumer Group,可是一個Consumer Group中只能有一個Consumer可以消費該消息
Partition 物理上的概念,一個topic能夠分爲多個partition,每一個partition內部是有序的

2.1 Topic & Partition

一個topic能夠認爲一個一類消息,每一個topic將被分紅多個partition,每一個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱爲offset(偏移量),offset爲一個long型的數字,它惟一標記一條消息。每條消息都被append到partition中,是順序寫磁盤,所以效率很是高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。apache

這裏寫圖片描述

每一條消息被髮送到broker中,會根據partition規則選擇被存儲到哪個partition。若是partition規則設置的合理,全部消息能夠均勻分佈到不一樣的partition裏,這樣就實現了水平擴展。(若是一個topic對應一個文件,那這個文件所在的機器I/O將會成爲這個topic的性能瓶頸,而partition解決了這個問題)。在建立topic時能夠在$KAFKA_HOME/config/server.properties中指定這個partition的數量(以下所示),固然能夠在topic建立以後去修改partition的數量。緩存

1
2
3
4
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions= 3

 

在發送一條消息時,能夠指定這個消息的key,producer根據這個key和partition機制來判斷這個消息發送到哪一個partition。partition機制能夠經過指定producer的partition.class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。安全

有關Topic與Partition的更多細節,能夠參考下面的「Kafka文件存儲機制」這一節。服務器


3 高可靠性存儲分析

Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。經過調節其副本相關參數,可使得Kafka在性能和可靠性之間運轉的遊刃有餘。Kafka從0.8.x版本開始提供partition級別的複製,replication的數量能夠在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)。網絡

這裏先從Kafka文件存儲機制入手,從最底層瞭解Kafka的存儲細節,進而對其的存儲有個微觀的認知。以後經過Kafka複製原理和同步方式來闡述宏觀層面的概念。最後從ISR,HW,leader選舉以及數據可靠性和持久性保證等等各個維度來豐富對Kafka相關知識點的認知。架構

3.1 Kafka文件存儲機制

Kafka中消息是以topic進行分類的,生產者經過topic向Kafka broker發送消息,消費者經過topic讀取數據。然而topic在物理層面又能以partition爲分組,一個topic能夠分紅若干個partition,那麼topic以及partition又是怎麼存儲的呢?partition還能夠細分爲segment,一個partition物理上由多個segment組成,那麼這些segment又是什麼呢?下面咱們來一一揭曉。

爲了便於說明問題,假設這裏只有一個Kafka集羣,且這個集羣只有一個Kafka broker,即只有一臺物理機。在這個Kafka broker中配置($KAFKA_HOME/config/server.properties中)log.dirs=/tmp/kafka-logs,以此來設置Kafka消息文件存儲目錄,與此同時建立一個topic:topic_zzh_test,partition的數量爲4($KAFKA_HOME/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic topic_zzh_test –replication-factor 1)。那麼咱們此時能夠在/tmp/kafka-logs目錄中能夠看到生成了4個目錄:

1
2
3
4
drwxr-xr-x 2 root root 4096 Apr 10 16 : 10 topic_zzh_test- 0
drwxr-xr-x 2 root root 4096 Apr 10 16 : 10 topic_zzh_test- 1
drwxr-xr-x 2 root root 4096 Apr 10 16 : 10 topic_zzh_test- 2
drwxr-xr-x 2 root root 4096 Apr 10 16 : 10 topic_zzh_test- 3

在Kafka文件存儲中,同一個topic下有多個不一樣的partition,每一個partiton爲一個目錄,partition的名稱規則爲:topic名稱+有序序號,第一個序號從0開始計,最大的序號爲partition數量減1,partition是實際物理上的概念,而topic是邏輯上的概念。

上面提到partition還能夠細分爲segment,這個segment又是什麼?若是就以partition爲最小存儲單位,咱們能夠想象當Kafka producer不斷髮送消息,必然會引發partition文件的無限擴張,這樣對於消息文件的維護以及已經被消費的消息的清理帶來嚴重的影響,因此這裏以segment爲單位又將partition細分。每一個partition(目錄)至關於一個巨型文件被平均分配到多個大小相等的segment(段)數據文件中(每一個segment 文件中消息數量不必定相等)這種特性也方便old segment的刪除,即方便已被消費的消息的清理,提升磁盤的利用率。每一個partition只須要支持順序讀寫就行,segment的文件生命週期由服務端配置參數(log.segment.bytes,log.roll.{ms,hours}等若干參數)決定。

segment文件由兩部分組成,分別爲「.index」文件和「.log」文件,分別表示爲segment索引文件和數據文件。這兩個文件的命令規則爲:partition全局的第一個segment從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值,數值大小爲64位,20位數字字符長度,沒有數字用0填充,以下:

1
2
3
4
5
6
00000000000000000000 .index
00000000000000000000 .log
00000000000000170410 .index
00000000000000170410 .log
00000000000000239430 .index
00000000000000239430 .log
 

 

以上面的segment文件爲例,展現出segment:00000000000000170410的「.index」文件和「.log」文件的對應的關係,以下圖:

這裏寫圖片描述

如上圖,「.index」索引文件存儲大量的元數據,「.log」數據文件存儲大量的消息,索引文件中的元數據指向對應數據文件中message的物理偏移地址。其中以「.index」索引文件中的元數據[3, 348]爲例,在「.log」數據文件表示第3個消息,即在全局partition中表示170410+3=170413個消息,該消息的物理偏移地址爲348。

那麼如何從partition中經過offset查找message呢? 
以上圖爲例,讀取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index爲最開始的文件,第二個文件爲00000000000000170410.index(起始偏移爲170410+1=170411),而第三個文件爲00000000000000239430.index(起始偏移爲239430+1=239431),因此這個offset=170418就落到了第二個文件之中。其餘後續文件能夠依次類推,以其實偏移量命名並排列這些文件,而後根據二分查找法就能夠快速定位到具體文件位置。其次根據00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進行讀取。

要是讀取offset=170418的消息,從00000000000000170410.log文件中的1325的位置進行讀取,那麼怎麼知道什麼時候讀完本條消息,不然就讀到下一條消息的內容了? 
這個就須要聯繫到消息的物理結構了,消息都具備固定的物理結構,包括:offset(8 Bytes)、消息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,能夠肯定一條消息的大小,即讀取到哪裏截止。

3.2 複製原理和同步方式

Kafka中topic的每一個partition有一個預寫式的日誌文件,雖然partition能夠繼續細分爲若干個segment文件,可是對於上層應用來講能夠將partition當作最小的存儲單元(一個有多個segment文件拼接的「巨型」文件),每一個partition都由一些列有序的、不可變的消息組成,這些消息被連續的追加到partition中。

這裏寫圖片描述

上圖中有兩個新名詞:HW和LEO。這裏先介紹下LEO,LogEndOffset的縮寫,表示每一個partition的log最後一條Message的位置。HW是HighWatermark的縮寫,是指consumer可以看到的此partition的位置,這個涉及到多副本的概念,這裏先說起一下,下節再詳表。

言歸正傳,爲了提升消息的可靠性,Kafka每一個topic的partition有N個副本(replicas),其中N(大於等於1)是topic的複製因子(replica fator)的個數。Kafka經過多副本機制實現故障自動轉移,當Kafka集羣中一個broker失效狀況下仍然保證服務可用。在Kafka中發生複製時確保partition的日誌能有序地寫到其餘節點上,N個replicas中,其中一個replica爲leader,其餘都爲follower, leader處理partition的全部讀寫請求,與此同時,follower會被動按期地去複製leader上的數據。

以下圖所示,Kafka集羣中有4個broker, 某topic有3個partition,且複製因子即副本個數也爲3:

這裏寫圖片描述

Kafka提供了數據複製算法保證,若是leader發生故障或掛掉,一個新leader被選舉並被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本爲leader,或者說follower追趕leader數據。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節)中全部follower滯後的狀態。當producer發送一條消息到broker後,leader寫入消息並複製到全部follower。消息提交以後才被成功複製到全部的同步副本。消息複製延遲受最慢的follower限制,重要的是快速檢測慢副本,若是follower「落後」太多或者失效,leader將會把它從ISR中刪除。

3.3 ISR

上節咱們涉及到ISR (In-Sync Replicas),這個是指副本同步隊列。副本數對Kafka的吞吐率是有必定的影響,但極大的加強了可用性。默認狀況下Kafka的replica數量爲1,即每一個partition都有一個惟一的leader,爲了確保消息的可靠性,一般應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置爲大於1,好比3。 全部的副本(replicas)統稱爲Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。

Kafka 0.9.0.0版本後移除了replica.lag.max.messages參數,只保留了replica.lag.time.max.ms做爲ISR中副本管理的參數。爲何這樣作呢?replica.lag.max.messages表示當前某個副本落後leader的消息數量超過了這個參數的值,那麼leader就會把follower從ISR中刪除。假設設置replica.lag.max.messages=4,那麼若是producer一次傳送至broker的消息數量都小於4條時,由於在leader接受到producer發送的消息以後而follower副本開始拉取這些消息以前,follower落後leader的消息數不會超過4條消息,故此沒有follower移出ISR,因此這時候replica.lag.max.message的設置彷佛是合理的。可是producer發起瞬時高峯流量,producer一次發送的消息超過4條時,也就是超過replica.lag.max.messages,此時follower都會被認爲是與leader副本不一樣步了,從而被踢出了ISR。但實際上這些follower都是存活狀態的且沒有性能問題。那麼在以後追上leader,並被從新加入了ISR。因而就會出現它們不斷地剔出ISR而後從新迴歸ISR,這無疑增長了無謂的性能損耗。並且這個參數是broker全局的。設置太大了,影響真正「落後」follower的移除;設置的過小了,致使follower的頻繁進出。沒法給定一個合適的replica.lag.max.messages的值,故此,新版本的Kafka移除了這個參數。

注:ISR中包括:leader和follower。

上面一節還涉及到一個概念,即HW。HW俗稱高水位,HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO做爲HW,consumer最多隻能消費到HW所在的位置。另外每一個replica都有HW,leader和follower各自負責更新本身的HW的狀態。對於leader新寫入的消息,consumer不能馬上消費,leader會等待該消息被全部ISR中的replicas同步後更新HW,此時消息才能被consumer消費。這樣就保證了若是leader所在的broker失效,該消息仍然能夠重新選舉的leader中獲取。對於來自內部broker的讀取請求,沒有HW的限制。

下圖詳細的說明了當producer生產消息至broker後,ISR以及HW和LEO的流轉過程:

這裏寫圖片描述

因而可知,Kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。事實上,同步複製要求全部能工做的follower都複製完,這條消息纔會被commit,這種複製方式極大的影響了吞吐率。而異步複製方式下,follower異步的從leader複製數據,數據只要被leader寫入log就被認爲已經commit,這種狀況下若是follower都尚未複製完,落後於leader時,忽然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。

Kafka的ISR的管理最終都會反饋到Zookeeper節點上。具體位置爲:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個地方會對這個Zookeeper的節點進行維護:

  1. Controller來維護:Kafka集羣中的其中一個Broker會被選舉爲Controller,主要負責Partition管理和副本狀態管理,也會執行相似於重分配partition之類的管理任務。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節點中。同時發起LeaderAndIsrRequest通知全部的replicas。
  2. leader來維護:leader有單獨的線程按期檢測ISR中follower是否脫離ISR, 若是發現ISR變化,則會將新的ISR的信息返回到Zookeeper的相關節點中。

3.4 數據可靠性和持久性保證

當producer向leader發送數據時,能夠經過request.required.acks參數來設置數據可靠性的級別:

  • 1(默認):這意味着producer在ISR中的leader已成功收到數據並獲得確認。若是leader宕機了,則會丟失數據。
  • 0:這意味着producer無需等待來自broker的確認而繼續發送下一批消息。這種狀況下數據傳輸效率最高,可是數據可靠性確是最低的。
  • -1:producer須要等待ISR中的全部follower都確認接收到數據後纔算一次發送完成,可靠性最高。可是這樣也不能保證數據不丟失,好比當ISR中只有leader時(前面ISR那一節講到,ISR中的成員因爲某些狀況會增長也會減小,最少就只剩一個leader),這樣就變成了acks=1的狀況。

若是要提升數據的可靠性,在設置request.required.acks=-1的同時,也要min.insync.replicas這個參數(能夠在broker或者topic層面進行設置)的配合,這樣才能發揮最大的功效。min.insync.replicas這個參數設定ISR中的最小副本數是多少,默認值爲1,當且僅當request.required.acks參數設置爲-1時,此參數才生效。若是ISR中的副本數少於min.insync.replicas配置的數量時,客戶端會返回異常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

接下來對acks=1和-1的兩種狀況進行詳細分析:

1. request.required.acks=1

producer發送數據到leader,leader寫本地日誌成功,返回客戶端成功;此時ISR中的副本尚未來得及拉取該消息,leader就宕機了,那麼這次發送的消息就會丟失。

這裏寫圖片描述

2. request.required.acks=-1

同步(Kafka默認爲同步,即producer.type=sync)的發送模式,replication.factor>=2且min.insync.replicas>=2的狀況下,不會丟失數據。

有兩種典型狀況。acks=-1的狀況下(如無特殊說明,如下acks都表示爲參數request.required.acks),數據發送到leader, ISR的follower所有完成數據同步後,leader此時掛掉,那麼會選舉出新的leader,數據不會丟失。

這裏寫圖片描述

acks=-1的狀況下,數據發送到leader後 ,部分ISR的副本同步,leader此時掛掉。好比follower1和follower2都有可能變成新的leader, producer端會獲得返回異常,producer端會從新發送數據,數據可能會重複。

這裏寫圖片描述

固然上圖中若是在leader crash的時候,follower2尚未同步到任何數據,並且follower2被選舉爲新的leader的話,這樣消息就不會重複。

注:Kafka只處理fail/recover問題,不處理Byzantine問題。

3.5 關於HW的進一步探討

考慮上圖(即acks=-1,部分ISR副本同步)中的另外一種狀況,若是在Leader掛掉的時候,follower1同步了消息4,5,follower2同步了消息4,與此同時follower2被選舉爲leader,那麼此時follower1中的多出的消息5該作如何處理呢?

這裏就須要HW的協同配合了。如前所述,一個partition中的ISR列表中,leader的HW是全部ISR列表裏副本中最小的那個的LEO。相似於木桶原理,水位取決於最低那塊短板。

這裏寫圖片描述

如上圖,某個topic的某partition有三個副本,分別爲A、B、C。A做爲leader確定是LEO最高,B緊隨其後,C機器因爲配置比較低,網絡比較差,故而同步最慢。這個時候A機器宕機,這時候若是B成爲leader,假如沒有HW,在A從新恢復以後會作同步(makeFollower)操做,在宕機時log文件以後直接作追加操做,而假如B的LEO已經達到了A的LEO,會產生數據不一致的狀況,因此使用HW來避免這種狀況。 
A在作同步操做的時候,先將log文件截斷到以前本身的HW的位置,即3,以後再從B中拉取消息進行同步。

若是失敗的follower恢復過來,它首先將本身的log文件截斷到上次checkpointed時刻的HW的位置,以後再從leader中同步消息。leader掛掉會從新選舉,新的leader會發送「指令」讓其他的follower截斷至自身的HW的位置而後再拉取新的消息。

當ISR中的個副本的LEO不一致時,若是此時leader掛掉,選舉新的leader時並非按照LEO的高低進行選舉,而是按照ISR中的順序選舉。

3.6 Leader選舉

一條消息只有被ISR中的全部follower都從leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower複製就宕機了,而形成數據丟失。而對於producer而言,它能夠選擇是否等待消息commit,這能夠經過request.required.acks來設置。這種機制確保了只要ISR中有一個或者以上的follower,一條被commit的消息就不會丟失。

有一個很重要的問題是當leader宕機了,怎樣在follower中選舉出新的leader,由於follower可能落後不少或者直接crash了,因此必須確保選擇「最新」的follower做爲新的leader。一個基本的原則就是,若是leader不在了,新的leader必須擁有原來的leader commit的全部消息。這就須要作一個折中,若是leader在一個消息被commit前等待更多的follower確認,那麼在它掛掉以後就有更多的follower能夠成爲新的leader,但這也會形成吞吐率的降低。

一種很是經常使用的選舉leader的方式是「少數服從多數」,Kafka並非採用這種方式。這種模式下,若是咱們有2f+1個副本,那麼在commit以前必須保證有f+1個replica複製完消息,同時爲了保證能正確選舉出新的leader,失敗的副本數不能超過f個。這種方式有個很大的優點,系統的延遲取決於最快的幾臺機器,也就是說好比副本數爲3,那麼延遲就取決於最快的那個follower而不是最慢的那個。「少數服從多數」的方式也有一些劣勢,爲了保證leader選舉的正常進行,它所能容忍的失敗的follower數比較少,若是要容忍1個follower掛掉,那麼至少要3個以上的副本,若是要容忍2個follower掛掉,必需要有5個以上的副本。也就是說,在生產環境下爲了保證較高的容錯率,必需要有大量的副本,而大量的副本又會在大數據量下致使性能的急劇降低。這種算法更多用在Zookeeper這種共享集羣配置的系統中而不多在須要大量數據的系統中使用的緣由。HDFS的HA功能也是基於「少數服從多數」的方式,可是其數據存儲並非採用這樣的方式。

實際上,leader選舉的算法很是多,好比Zookeeper的ZabRaft以及Viewstamped Replication。而Kafka所使用的leader選舉算法更像是微軟的PacificA算法。

Kafka在Zookeeper中爲每個partition動態的維護了一個ISR,這個ISR裏的全部replica都跟上了leader,只有ISR裏的成員纔能有被選爲leader的可能(unclean.leader.election.enable=false)。在這種模式下,對於f+1個副本,一個Kafka topic能在保證不丟失已經commit消息的前提下容忍f個副本的失敗,在大多數使用場景下,這種模式是十分有利的。事實上,爲了容忍f個副本的失敗,「少數服從多數」的方式和ISR在commit前須要等待的副本的數量是同樣的,可是ISR須要的總的副本的個數幾乎是「少數服從多數」的方式的一半。

上文提到,在ISR中至少有一個follower時,Kafka能夠確保已經commit的數據不丟失,但若是某一個partition的全部replica都掛了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:

  1. 等待ISR中任意一個replica「活」過來,而且選它做爲leader
  2. 選擇第一個「活」過來的replica(並不必定是在ISR中)做爲leader

這就須要在可用性和一致性當中做出一個簡單的抉擇。若是必定要等待ISR中的replica「活」過來,那不可用的時間就可能會相對較長。並且若是ISR中全部的replica都沒法「活」過來了,或者數據丟失了,這個partition將永遠不可用。選擇第一個「活」過來的replica做爲leader,而這個replica不是ISR中的replica,那即便它並不保障已經包含了全部已commit的消息,它也會成爲leader而做爲consumer的數據源。默認狀況下,Kafka採用第二種策略,即unclean.leader.election.enable=true,也能夠將此參數設置爲false來啓用第一種策略。

unclean.leader.election.enable這個參數對於leader的選舉、系統的可用性以及數據的可靠性都有相當重要的影響。下面咱們來分析下幾種典型的場景。

這裏寫圖片描述

若是上圖所示,假設某個partition中的副本數爲3,replica-0, replica-1, replica-2分別存放在broker0, broker1和broker2中。AR=(0,1,2),ISR=(0,1)。 
設置request.required.acks=-1, min.insync.replicas=2,unclean.leader.election.enable=false。這裏將broker0中的副本也稱之爲broker0起初broker0爲leader,broker1爲follower。

  • 當ISR中的replica-0出現crash的狀況時,broker1選舉爲新的leader[ISR=(1)],由於受min.insync.replicas=2影響,write不能服務,可是read能繼續正常服務。此種狀況恢復方案:

    1. 嘗試恢復(重啓)replica-0,若是能起來,系統正常;
    2. 若是replica-0不能恢復,須要將min.insync.replicas設置爲1,恢復write功能。
  • 當ISR中的replica-0出現crash,緊接着replica-1也出現了crash, 此時[ISR=(1),leader=-1],不能對外提供服務,此種狀況恢復方案:

    1. 嘗試恢復replica-0和replica-1,若是都能起來,則系統恢復正常;
    2. 若是replica-0起來,而replica-1不能起來,這時候仍然不能選出leader,由於當設置unclean.leader.election.enable=false時,leader只能從ISR中選舉,當ISR中全部副本都失效以後,須要ISR中最後失效的那個副本能恢復以後才能選舉leader, 即replica-0先失效,replica-1後失效,須要replica-1恢復後才能選舉leader。保守的方案建議把unclean.leader.election.enable設置爲true,可是這樣會有丟失數據的狀況發生,這樣能夠恢復read服務。一樣須要將min.insync.replicas設置爲1,恢復write功能;
    3. replica-1恢復,replica-0不能恢復,這個狀況上面遇到過,read服務可用,須要將min.insync.replicas設置爲1,恢復write功能;
    4. replica-0和replica-1都不能恢復,這種狀況能夠參考情形2.
  • 當ISR中的replica-0, replica-1同時宕機,此時[ISR=(0,1)],不能對外提供服務,此種狀況恢復方案:嘗試恢復replica-0和replica-1,當其中任意一個副本恢復正常時,對外能夠提供read服務。直到2個副本恢復正常,write功能才能恢復,或者將將min.insync.replicas設置爲1。

3.7 Kafka的發送模式

Kafka的發送模式由producer端的配置參數producer.type來設置,這個參數指定了在後臺線程中消息的發送方式是同步的仍是異步的,默認是同步的方式,即producer.type=sync。若是設置成異步的模式,即producer.type=async,能夠是producer以batch的形式push數據,這樣會極大的提升broker的性能,可是這樣會增長丟失數據的風險。若是須要確保消息的可靠性,必需要將producer.type設置爲sync。

對於異步模式,還有4個配套的參數,以下:

Property Description
queue.buffering.max.ms 默認值:5000。啓用異步模式時,producer緩存消息的時間。好比咱們設置成1000時,它會緩存1s的數據再一次發送出去,這樣能夠極大的增長broker吞吐量,但也會形成時效性的下降。
queue.buffering.max.messages 默認值:10000。啓用異步模式時,producer緩存隊列裏最大緩存的消息數量,若是超過這個值,producer就會阻塞或者丟掉消息。
queue.enqueue.timeout.ms 默認值:-1。當達到上面參數時producer會阻塞等待的時間。若是設置爲0,buffer隊列滿時producer不會阻塞,消息直接被丟掉;若設置爲-1,producer會被阻塞,不會丟消息。
batch.num.messages 默認值:200。啓用異步模式時,一個batch緩存的消息數量。達到這個數值時,producer纔會發送消息。(每次批量發送的數量)

以batch的方式推送數據能夠極大的提升處理效率,kafka producer能夠將消息在內存中累計到必定數量後做爲一個batch發送請求。batch的數量大小能夠經過producer的參數(batch.num.messages)控制。經過增長batch的大小,能夠減小網絡請求和磁盤IO的次數,固然具體參數設置須要在效率和時效性方面作一個權衡。在比較新的版本中還有batch.size這個參數。


4 高可靠性使用分析

4.1 消息傳輸保障

前面已經介紹了Kafka如何進行有效的存儲,以及瞭解了producer和consumer如何工做。接下來討論的是Kafka如何確保消息在producer和consumer之間傳輸。有如下三種可能的傳輸保障(delivery guarantee):

  • At most once: 消息可能會丟,但毫不會重複傳輸
  • At least once:消息毫不會丟,但可能會重複傳輸
  • Exactly once:每條消息確定會被傳輸一次且僅傳輸一次

Kafka的消息傳輸保障機制很是直觀。當producer向broker發送消息時,一旦這條消息被commit,因爲副本機制(replication)的存在,它就不會丟失。可是若是producer發送數據給broker後,遇到的網絡問題而形成通訊中斷,那producer就沒法判斷該條消息是否已經提交(commit)。雖然Kafka沒法肯定網絡故障期間發生了什麼,可是producer能夠retry屢次,確保消息已經正確傳輸到broker中,因此目前Kafka實現的是at least once。

consumer從broker中讀取消息後,能夠選擇commit,該操做會在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit以後的開始位置相同。固然也能夠將consumer設置爲autocommit,即consumer一旦讀取到數據當即自動commit。若是隻討論這一讀取消息的過程,那Kafka是確保了exactly once, 可是若是因爲前面producer與broker之間的某種緣由致使消息的重複,那麼這裏就是at least once。

考慮這樣一種狀況,當consumer讀完消息以後先commit再處理消息,在這種模式下,若是consumer在commit後還沒來得及處理消息就crash了,下次從新開始工做後就沒法讀到剛剛已提交而未處理的消息,這就對應於at most once了。

讀完消息先處理再commit。這種模式下,若是處理完了消息在commit以前consumer crash了,下次從新開始工做時還會處理剛剛未commit的消息,實際上該消息已經被處理過了,這就對應於at least once。

要作到exactly once就須要引入消息去重機制。

4.2 消息去重

如上一節所述,Kafka在producer端和consumer端都會出現消息的重複,這就須要去重處理。

Kafka文檔中說起GUID(Globally Unique Identifier)的概念,經過客戶端生成算法獲得每一個消息的unique id,同時可映射至broker上存儲的地址,即經過GUID即可查詢提取消息內容,也便於發送方的冪等性保證,須要在broker上提供此去重處理模塊,目前版本尚不支持。

針對GUID, 若是從客戶端的角度去重,那麼須要引入集中式緩存,必然會增長依賴複雜度,另外緩存的大小難以界定。

不僅是Kafka, 相似RabbitMQ以及RocketMQ這類商業級中間件也只保障at least once, 且也沒法從自身去進行消息去重。因此咱們建議業務方根據自身的業務特色進行去重,好比業務消息自己具有冪等性,或者藉助Redis等其餘產品進行去重處理。

4.3 高可靠性配置

Kafka提供了很高的數據冗餘彈性,對於須要數據高可靠性的場景,咱們能夠增長數據冗餘備份數(replication.factor),調高最小寫入副本數的個數(min.insync.replicas)等等,可是這樣會影響性能。反之,性能提升而可靠性則下降,用戶須要自身業務特性在彼此之間作一些權衡性選擇。

要保證數據寫入到Kafka是安全的,高可靠的,須要以下的配置:

  • topic的配置:replication.factor>=3,即副本數至少是3個;2<=min.insync.replicas<=replication.factor
  • broker的配置:leader的選舉條件unclean.leader.election.enable=false
  • producer的配置:request.required.acks=-1(all),producer.type=sync

5 BenchMark

Kafka在惟品會有着很深的歷史淵源,根據惟品會消息中間件團隊(VMS團隊)所掌握的資料顯示,在VMS團隊運轉的Kafka集羣中所支撐的topic數已接近2000,天天的請求量也已達千億級。這裏就以Kafka的高可靠性爲基準點來探究幾種不一樣場景下的行爲表現,以此來加深對Kafka的認知,爲你們在之後高效的使用Kafka時提供一份依據。

5.1 測試環境

Kafka broker用到了4臺機器,分別爲broker[0/1/2/3]配置以下:

  • CPU: 24core/2.6GHZ
  • Memory: 62G
  • Network: 4000Mb
  • OS/kernel: CentOs release 6.6 (Final)
  • Disk: 1089G
  • Kafka版本:0.10.1.0

broker端JVM參數設置: 
-Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

客戶端機器配置:

  • CPU: 24core/2.6GHZ
  • Memory: 3G
  • Network: 1000Mb
  • OS/kernel: CentOs release 6.3 (Final)
  • Disk: 240G

5.2 不一樣場景測試

場景1:測試不一樣的副本數、min.insync.replicas策略以及request.required.acks策略(如下簡稱acks策略)對於發送速度(TPS)的影響。

具體配置:一個producer;發送方式爲sync;消息體大小爲1kB;partition數爲12。副本數爲:1/2/4;min.insync.replicas分別爲1/2/4;acks分別爲-1(all)/1/0。

具體測試數據以下表(min.insync.replicas只在acks=-1時有效):

acks replicas min.insync.replicas retries TPS
-1 1 1 0 28511.3
-1 2 1 0 22359.5
-1 2 2 0 22927.4
-1 4 1 0 16193.9
-1 4 2 0 16599.9
-1 4 4 0 16680.3
0 1 N/A 0 45353.8
0 2 N/A 0 46426.5
0 4 N/A 0 46764.2
1 1 N/A 0 33950.3
1 2 N/A 0 32192.2
1 4 N/A 0 32275.9

測試結果分析:

  • 客戶端的acks策略對發送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1;
  • 副本數越高,TPS越低;副本數一致時,min.insync.replicas不影響TPS;
  • acks=0/1時,TPS與min.insync.replicas參數以及副本數無關,僅受acks策略的影響。

下面將partition的個數設置爲1,來進一步確認下不一樣的acks策略、不一樣的min.insync.replicas策略以及不一樣的副本數對於發送速度的影響,詳細請看情景2和情景3。

場景2:在partition個數固定爲1,測試不一樣的副本數和min.insync.replicas策略對發送速度的影響。

具體配置:一個producer;發送方式爲sync;消息體大小爲1kB;producer端acks=-1(all)。變換副本數:2/3/4; min.insync.replicas設置爲:1/2/4。

測試結果以下:

replicas min.insync.replicas TPS
2 1 9738.8
2 2 9701.6
3 1 8999.7
3 2 9243.1
4 1 9005.8
4 2 8216.9
4 4 9092.4

測試結果分析:副本數越高,TPS越低(這點與場景1的測試結論吻合),可是當partition數爲1時差距甚微。min.insync.replicas不影響TPS。

場景3:在partition個數固定爲1,測試不一樣的acks策略和副本數對發送速度的影響。

具體配置:一個producer;發送方式爲sync;消息體大小爲1kB;min.insync.replicas=1。topic副本數爲:1/2/4;acks: 0/1/-1。

測試結果以下:

replicas acks TPS
1 0 76696
2 0 57503
4 0 59367
1 1 19489
2 1 20404
4 1 18365
1 -1 18641
2 -1 9739
4 -1 9006

測試結果分析(與情景1一致):

  • 副本數越多,TPS越低;
  • 客戶端的acks策略對發送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1。

場景4:測試不一樣partition數對發送速率的影響

具體配置:一個producer;消息體大小爲1KB;發送方式爲sync;topic副本數爲2;min.insync.replicas=2;acks=-1。partition數量設置爲1/2/4/8/12。

測試結果:

這裏寫圖片描述

測試結果分析:partition的不一樣會影響TPS,隨着partition的個數的增加TPS會有所增加,但並非一直成正比關係,到達必定臨界值時,partition數量的增長反而會使TPS略微下降。

場景5:經過將集羣中部分broker設置成不可服務狀態,測試對客戶端以及消息落盤的影響。

具體配置:一個producer;消息體大小1KB;發送方式爲sync;topic副本數爲4;min.insync.replicas設置爲2;acks=-1;retries=0/100000000;partition數爲12。

具體測試數據以下表:

acks replicas min.insync.replicas retries 測試方法 TPS 數據落盤 出現錯誤
-1 4 2 0 發送過程當中kill兩臺broker 12840 一致(部分數據可落盤,部分失敗) 錯誤1
-1 4 2 100000000 發送過程當中kill兩臺broker 13870 一致(消息有重複落盤) 錯誤2
-1 4 2 100000000 發送過程當中kill三臺broker,以後重啓 N/A 一致(消息有重複落盤) 錯誤二、三、4

出錯信息:

  • 錯誤1:客戶端返回異常,部分數據可落盤,部分失敗:org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
  • 錯誤2:[WARN]internals.Sender - Got error produce response with correlation id 19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NETWORK_EXCEPTION
  • 錯誤3: [WARN]internals.Sender - Got error produce response with correlation id 77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859 attempts left). Error: NOT_ENOUGH_REPLICAS
  • 錯誤4: [WARN]internals.Sender - Got error produce response with correlation id 77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND

測試結果分析:

  • kill兩臺broker後,客戶端能夠繼續發送。broker減小後,partition的leader分佈在剩餘的兩臺broker上,形成了TPS的減少;
  • kill三臺broker後,客戶端沒法繼續發送。Kafka的自動重試功能開始起做用,當大於等於min.insync.replicas數量的broker恢復後,能夠繼續發送;
  • 當retries不爲0時,消息有重複落盤;客戶端成功返回的消息都成功落盤,異常時部分消息能夠落盤。

場景6:測試單個producer的發送延遲,以及端到端的延遲。

具體配置::一個producer;消息體大小1KB;發送方式爲sync;topic副本數爲4;min.insync.replicas設置爲2;acks=-1;partition數爲12。

測試數據及結果(單位爲ms):

發送端(avg) 發送端(min) 發送端(max) 發送端(99%) 發送端(99.99%) 消費端(avg) 消費端(min) 消費端(max) 消費端(99%) 消費端(99.99%)
1.715 1 157 3 29 1.646 1 288 4 72

各場景測試總結:

    • 當acks=-1時,Kafka發送端的TPS受限於topic的副本數量(ISR中),副本越多TPS越低;
    • acks=0時,TPS最高,其次爲1,最差爲-1,即TPS:acks_0 > acks_1 > ack_-1;
    • min.insync.replicas參數不影響TPS;
    • partition的不一樣會影響TPS,隨着partition的個數的增加TPS會有所增加,但並非一直成正比關係,到達必定臨界值時,partition數量的增長反而會使TPS略微下降;
    • Kafka在acks=-1,min.insync.replicas>=1時,具備高可靠性,全部成功返回的消息均可以落盤。
相關文章
相關標籤/搜索