kafka的高可用和一致性探究

1 kafka基礎

本篇文章討論的kafka版本是目前最新版 0.10.1.0。java

1.1 kafka種的KafkaController

全部broker會經過ZooKeeper選舉出一個做爲KafkaController,來負責:apache

  • 監控全部broker的存活,以及向他們發送相關的執行命令。
  • 分區的狀態維護:負責分區的新增、下線等,分區副本的leader選舉
  • 副本的狀態維護:負責副本的新增、下線等

1.2 kafka分區中的基本概念

每一個分區能夠有多個副本,分散在不一樣的broker上。緩存

  • leader副本:被KafkaController選舉出來的,做爲該分區的leader服務器

  • 其餘follower副本:其餘副本都做爲follower副本微信

  • isr列表:簡單描述就是,"跟得上"leader的副本列表(包含leader),最開始是全部副本。這裏的跟得上是指多線程

    • replica.lag.time.max.ms:在0.9.0.0以前表示follower若是在此時間間隔內沒有向leader發送fetch請求,則該follower就會被剔除isr列表,在0.9.0.0以後表示若是該follower在此時間間隔內一直沒有追上過leader的全部消息,則該follower就會被剔除isr列表
    • replica.lag.max.messages(0.9.0.0版本中已被廢除):follower若是落後leader的消息個數超過該值,則該follower就會被剔除isr列表 廢除的主要緣由是:目前這個配置是個統一配置,不一樣的topic速率生產速率不太同樣,沒辦法來指定一個具體的值來應用到全部的topic上。未來能夠將這個配置下放到topic級別,關於這個問題,能夠見這裏的討論Automate replica lag tuning

每個producer發送消息給某個分區的leader副本,其餘follower副本又會複製該消息。producer端有一個acks參數能夠設置:異步

  • acks=0:表示producer不須要leader發送響應,即producer只管發無論發送成功與否。延遲低,容易丟失數據。
  • acks=1:表示leader寫入成功(可是並無刷新到磁盤)後即向producer響應。延遲中等,一旦leader副本掛了,就會丟失數據。
  • acks=-1:表示leader會等待isr列表中全部副本都寫入成功才向producer發送響應。延遲高、可靠性高。可是也會丟數據,下面會詳細討論

同時對於isr列表的數量要求也有一個配置fetch

  • min.insync.replicas:默認是1。當acks=-1的時候,leader在處理新消息前,會先判斷當前isr列表的的size是否小於這個值,若是小於的話,則不容許寫入,返回NotEnoughReplicasException異常。同時,一旦容許寫入了以後,在響應producer以前也會判斷當前isr列表的size是否小於該值,若是小於返回NotEnoughReplicasAfterAppendException異常

咱們本篇文章就重點經過kafka的原理來揭示在acks=-1的狀況下,哪些狀況下會丟失數據,或許能夠提一些改進措施來作到不丟失數據。線程

下面會先介紹下leader和follower副本複製的原理scala

1.3 副本複製過程

  • leader副本的屬性

    • highWatermarkMetadata:表明已經被isr列表複製的最大offset,consumer只能消費該offset以前的數據
    • logEndOffsetMetadata:表明leader副本上已經複製的最大offset

    leader副本擁有其餘副本的記錄,保存着他們的以下屬性:

    • logEndOffsetMetadata:表明該follower副本已經複製的最大offset
    • lastCaughtUpTimeMs:記錄該follower副本上一次追上leader副本的全部消息的時間
  • follower副本的屬性

    • highWatermarkMetadata:follower會獲取到leader的highWatermarkMetadata更新到本身的該屬性中
    • logEndOffsetMetadata:表明follower副本上已經複製的最大offset

    其中follower會不斷的向leader發送fetch請求,若是沒有數據fetch則被leader阻塞一段時間,等待新數據的來臨,一旦來臨則解除阻塞,複製數據給follower。

咱們來看下當producer的acks=-1時,一次消息寫入的整個過程,上述是屬性是怎麼變化的

  • 1.3.1 消息準備寫入leader副本,leader副本首先判斷當前isr列表是否小於min.insync.replicas,不小於才容許寫入。

    若是不小於,leader寫入到本身的log中,獲得該消息的offset,而後對其餘follower的fetch請求解除阻塞,複製必定量的消息給follower

    同時leader將本身最新的highWatermarkMetadata傳給follower

    同時會判斷此次複製是否複製到leader副本的末尾了,即logEndOffsetMetadata位置,若是是的話,則更新上述的lastCaughtUpTimeMs

  • 1.3.2 follower會將fetch來的數據寫入到本身的log中,本身的logEndOffsetMetadata獲得了更新,同時更新本身的highWatermarkMetadata,就是取leader傳來的highWatermarkMetadata和本身的logEndOffsetMetadata中的最小值

    而後follower再一次向leader發送fetch請求,fetch的初始offset就是本身的logEndOffsetMetadata+1。

  • 1.3.3 leader副本收到該fetch後,會更新leader副本中該follower的logEndOffsetMetadata爲上述fetch的offset,同時會對全部的isr列表的logEndOffsetMetadata排序獲得最小的logEndOffsetMetadata做爲最新的highWatermarkMetadata

    若是highWatermarkMetadata已經大於了leader寫入該消息的offset了,說明該消息已經被isr列表都複製過了,則leader開始迴應producer

    判斷當前isr列表的size是否小於min.insync.replicas,若是小於返回NotEnoughReplicasAfterAppendException異常,不小於則表明正常寫入了。

  • 1.3.4 follower在下一次的fetch請求的響應中就會獲得leader最新的highWatermarkMetadata,更新本身的highWatermarkMetadata

1.4 leader副本選舉

若是某個broker掛了,leader副本在該broker上的分區就要從新進行leader選舉。來簡要描述下leader選舉的過程

  • 1.4.1 KafkaController會監聽ZooKeeper的/brokers/ids節點路徑,一旦發現有broker掛了,執行下面的邏輯。這裏暫時先不考慮KafkaController所在broker掛了的狀況,KafkaController掛了,各個broker會從新leader選舉出新的KafkaController

  • 1.4.2 leader副本在該broker上的分區就要從新進行leader選舉,目前的選舉策略是

    • 1.4.2.1 優先從isr列表中選出第一個做爲leader副本

    • 1.4.2.2 若是isr列表爲空,則查看該topic的unclean.leader.election.enable配置。

      unclean.leader.election.enable:爲true則表明容許選用非isr列表的副本做爲leader,那麼此時就意味着數據可能丟失,爲false的話,則表示不容許,直接拋出NoReplicaOnlineException異常,形成leader副本選舉失敗。

    • 1.4.2.3 若是上述配置爲true,則從其餘副本中選出一個做爲leader副本,而且isr列表只包含該leader副本。

    一旦選舉成功,則將選舉後的leader和isr和其餘副本信息寫入到該分區的對應的zk路徑上。

  • 1.4.3 KafkaController向上述相關的broker上發送LeaderAndIsr請求,將新分配的leader、isr、所有副本等信息傳給他們。同時將向全部的broker發送UpdateMetadata請求,更新每一個broker的緩存的metadata數據。

  • 1.4.4 若是是leader副本,更新該分區的leader、isr、全部副本等信息。若是本身以前就是leader,則如今什麼操做都不用作。若是以前不是leader,則需將本身保存的全部follower副本的logEndOffsetMetadata設置爲UnknownOffsetMetadata,以後等待follower的fetch,就會進行更新

  • 1.4.5 若是是follower副本,更新該分區的leader、isr、全部副本等信息

    而後將日誌截斷到本身保存的highWatermarkMetadata位置,即日誌的logEndOffsetMetadata等於了highWatermarkMetadata

    最後建立新的fetch請求線程,向新leader不斷髮送fetch請求,初次fetch的offset是logEndOffsetMetadata。

上述重點就是leader副本的日誌不作處理,而follower的日誌則須要截斷到highWatermarkMetadata位置。

至此,算是簡單描述了分區的基本狀況,下面就針對上述過程來討論下kafka分區的高可用和一致性問題。

2 消息丟失

2.1 消息丟失的場景

哪些場景下會丟失消息?

  • acks= 0、1,很明顯都存在消息丟失的可能。

  • 即便設置acks=-1,當isr列表爲空,若是unclean.leader.election.enable爲true,則會選擇其餘存活的副本做爲新的leader,也會存在消息丟失的問題。

  • 即便設置acks=-1,當isr列表爲空,若是unclean.leader.election.enable爲false,則不會選擇其餘存活的副本做爲新的leader,即犧牲了可用性來防止上述消息丟失問題。

  • 即便設置acks=-1,而且選出isr中的副本做爲leader的時候,仍然是會存在丟數據的狀況的:

    s1 s2 s3是isr列表,還有其餘副本爲非isr列表,s1是leader,一旦某個日誌寫入到s1 s2 s3,則s1將highWatermarkMetadata提升,並回復了客戶端ok,可是s2 s3的highWatermarkMetadata可能還沒被更新,此時s1掛了,s2當選leader了,s2的日誌不變,可是s3就要截斷日誌了,這時已經回覆客戶端的日誌是沒有丟的,由於s2已經複製了。

    可是若是此時s2一旦掛了,s3當選,則s3上就不存在上述日誌了(前面s2當選leader的時候s3已經將日誌截斷了),這時候就形成日誌丟失了。

2.2 不丟消息的探討

其實咱們是但願上述最後一個場景可以作到不丟消息的,可是目前的作法仍是可能會丟消息的。

丟消息最主要的緣由是:

因爲follower的highWatermarkMetadata相對於leader的highWatermarkMetadata是延遲更新的,當leader選舉完成後,全部follower副本的截斷到本身的highWatermarkMetadata位置,則可能截斷了已被老leader提交了的日誌,這樣的話,這部分日誌僅僅存在新的leader副本中,在其餘副本中消失了,一旦leader副本掛了,這部分日誌就完全丟失了

這個截斷到highWatermarkMetadata的操做的確太狠了,可是它的用途有一個就是:避免了日誌的不一致的問題。經過每次leader選舉以後的日誌截斷,來達到和leader之間日誌的一致性,避免出現日誌錯亂的狀況。

ZooKeeper和Raft的實現也有相似的日誌複製的問題,那ZooKeeper和Raft的實現有沒有這種問題呢?他們是如何解決的呢?

Raft並不進行日誌的截斷操做,而是會經過每第二天志複製時的一致性檢查來進行日誌的糾正,達到和leader來保持一致的目的。不截斷日誌,那麼對於已經提交的日誌,則必然存在過半的機器上從而可以保證日誌基本是不會丟失的。

ZooKeeper只有當某個follower的記錄超出leader的部分纔會截斷,其餘的不會截斷的。選舉出來的leader是通過過半pk的,必然是包含所有已經被提交的日誌的,即便該leader掛了,再次從新選舉,因爲不進行日誌截斷,仍然是能夠選出其餘包含所有已提交的日誌的(有過半的機器都包含所有已提交的日誌)。ZooKeeper對於日誌的糾正則是在leader選舉完成後專門開啓一個糾正過程。

kafka的截斷到highWatermarkMetadata的確有點太粗暴了,若是不截斷日誌,則須要解決日誌錯亂的問題,即便不可以像ZooKeeper那樣花大代價專門開啓一個糾正過程,能夠像Raft那樣每次在fetch的時候能夠進行不斷的糾正。這一塊還有待繼續關注。

3 順序性

kafka目前是隻能保證一個分區內的數據是有序的。

可是你可能常常據說,一旦某個broker掛了,就可能產生亂序問題(也沒人指出亂序的緣由),是否正確呢?

首先來看看如何能保證單個分區內消息的有序性,有以下幾個過程:

  • 3.1 producer按照消息的順序進行發送

    不少時候爲了發送效率,採用的辦法是多線程、異步、批量發送。

    若是爲了保證順序,則不能使用多線程來執行發送任務。

    異步:通常是把消息先發到一個隊列中,由後臺線程不斷的執行發送任務。這種方式對消息的順序也是有影響的:

    如先發送消息1,後發送消息2,此時服務器端在處理消息1的時候返回了異常,可能在處理消息2的時候成功了,此時若再重試消息1就會形成消息亂序的問題。因此producer端須要先確認消息1發送成功了才能執行消息2的發送。

    對於kafka來講,目前是異步、批量發送。解決異步的上述問題就是配置以下屬性:

    max.in.flight.requests.per.connection=1

    即producer發現一旦還有未確認發送成功的消息,則後面的消息不容許發送。

  • 3.2 相同key的消息可以hash到相同的分區

    正常狀況下是沒問題的,可是一旦某個分區掛了,如本來總共4個分區,此時只有3個分區存活,在此分區恢復的這段時間內,是否會存在hash錯亂到別的分區?

    那就要看producer端獲取的metadata信息是否會立馬更新成3個分區。目前看來應該是不會的

    producer見到的metadata數據是各個broker上的緩存數據,這些緩存數據是由KafkaController來統一進行更新的。一旦leader副本掛了,KafkaController並不會去立馬更新成3個分區,而是去執行leader選舉,選舉完成後纔會去更新metadata數據,此時選舉完成後仍然是保證4個分區的,也就是說producer是不可能獲取到只有3個分區的metadata數據的,因此producer端仍是能正常hash的,不會錯亂分區的。

    在整個leader選舉恢復過程,producer最可能是沒法寫入數據(後期能夠重試)。

  • 3.3 系統對順序消息的支持

    leader副本按照消息到來的前後順序寫入本地日誌的,一旦寫入日誌後,該順序就肯定了,follower副本也是按照該順序進行復制的。對於消息的提交也是按照消息的offset從低到高來確認提交的,因此說kafka對於消息的處理是順序的。

  • 3.4 consumer可以按照消息的順序進行消費

    爲了接收的效率,可能會使用多線程進行消費。這裏爲了保證順序就只能使用單線程來進行消費了。

    目前kafka的Consumer有scala版本的和java版本的(這一塊以後再詳細探討),最新的java版本,對用戶提供一個poll方法,用戶本身去決定是使用多線程仍是單線程。

4 其餘話題

  • 如何看待kafka的isr列表設計?和過半怎麼對比呢?

    對於相同數量的2n個follower加一個leader,過半呢則容許n個follower掛掉,而isr呢則容許2n個follower掛掉(可是會存在丟失消息的問題),因此過半更多會犧牲可用性(掛掉一半以上就不可用了)來加強數據的一致性,而isr會犧牲一致性來加強可用性(掛掉一半以上扔可以使用,可是存在丟數據的問題)

    可是在確認效率上:過半僅僅須要最快的n+1的寫入成功便可斷定爲成功,而isr則須要2n+1的寫入成功纔算成功。同時isr是動態變化的過程,一旦跟不上或者跟上了都會離開或者加入isr列表。isr列表越小寫入速度就會加快。

  • 有哪些環節會形成消息的重複消費?若是避免不了,如何去減小重複?

    • producer端重複發送

      producer端因發送超時等等緣由作重試操做,目前broker端作重複請求的判斷仍是很難的,目前kafka也沒有去作,而是存儲完消息以後,若是開啓了Log compaction,它會經過kafka消息中的key來斷定是不是重複消息,是的話則會刪除。

    • consumer消費後,未及時提交消費的offset便掛了,下次恢復後就會重複消費

      這個目前來講並無通用的解決辦法,先消費後提交offset可能會重複,先提交offset後消費可能形成消息丟失,因此通常仍是優先保證消息不丟,在業務上去作去重判斷。

本文章涉及到的話題不少,不免有疏漏的地方,還請批評指正。

歡迎繼續來討論,越辯越清晰

歡迎關注微信公衆號:乒乓狂魔

乒乓狂魔微信公衆號

相關文章
相關標籤/搜索