在kafka中主要經過ISR機制來保證消息的可靠性。 下面經過幾個問題來講明kafka如何來保證消息可靠性與一致性算法
在kafka中ISR是什麼?
在zk中會保存AR(Assigned Replicas)列表,其中包含了分區全部的副本,其中 AR = ISR+OSR網絡
- ISR(in sync replica):是kafka動態維護的一組同步副本,在ISR中有成員存活時,只有這個組的成員才能夠成爲leader,內部保存的爲每次提交信息時必須同步的副本(acks = all時),每當leader掛掉時,在ISR集合中選舉出一個follower做爲leader提供服務,當ISR中的副本被認爲壞掉的時候,會被踢出ISR,當從新跟上leader的消息數據時,從新進入ISR。
- OSR(out sync replica): 保存的副本沒必要保證必須同步完成才進行確認,OSR內的副本是否同步了leader的數據,不影響數據的提交,OSR內的follower盡力的去同步leader,可能數據版本會落後。
kafka如何控制須要同步多少副本才能夠返回肯定到生產者消息纔可用?
- 當寫入到kakfa時,生產者能夠選擇是否等待0(只需寫入leader),1(只需同步一個副本) 或 -1(所有副本)的消息確認(這裏的副本指的是ISR中的副本)。
- 須要注意的是「全部副本確認」並不能保證所有分配副本已收到消息。默認狀況下,當acks=all時,只要當前全部在同步中的副本(ISR中的副本)收到消息,就會進行確認。因此Kafka的交付承諾能夠這樣理解:對沒有提交成功的消息不作任何交付保證,而對於ISR中至少有一個存活的徹底同步的副本的狀況下的「成功提交」的消息保證不會丟失。
對於kafka節點活着的條件是什麼?
- 第一點:一個節點必須維持和zk的會話,經過zk的心跳檢測實現
- 第二點:若是節點是一個slave也就是複製節點,那麼他必須複製leader節點不能太落後。這裏的落後能夠指兩種狀況
- 1:數據複製落後,slave節點和leader節點的數據相差較大,這種狀況有一個缺點,在生產者忽然發送大量消息致使網絡堵塞後,大量的slav複製受阻,致使數據複製落後被大量的踢出ISR。
- 2:時間相差過大,指的是slave向leader請求複製的時間距離上次請求相隔時間過大。經過配置
replica.lag.time.max
就能夠配置這個時間參數。這種方式解決了上述第一種方式致使的問題。
kafka分區partition掛掉以後如何恢復?
在kafka中有一個partition recovery機制用於恢復掛掉的partition。分佈式
每一個Partition會在磁盤記錄一個RecoveryPoint(恢復點), 記錄已經flush到磁盤的最大offset。當broker fail 重啓時,會進行loadLogs。 首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint點上的segment及之後的segment, 這些segment就是可能沒有徹底flush到磁盤segments。而後調用segment的recover,從新讀取各個segment的msg,並重建索引。.net
優勢:日誌
- 以segment爲單位管理Partition數據,方便數據生命週期的管理,刪除過時數據簡單
- 在程序崩潰重啓時,加快recovery速度,只需恢復未徹底flush到磁盤的segment便可
什麼緣由致使副本與leader不一樣步的呢?
- 慢副本:在必定週期時間內follower不能追遇上leader。最多見的緣由之一是I / O瓶頸致使follower追加複製消息速度慢於從leader拉取速度。
- 卡住副本:在必定週期時間內follower中止從leader拉取請求。follower replica卡住了是因爲GC暫停或follower失效或死亡。
- 新啓動副本:當用戶給主題增長副本因子時,新的follower不在同步副本列表中,直到他們徹底遇上了leader日誌。
一個partition的follower落後於leader足夠多時,被認爲不在同步副本列表或處於滯後狀態。code
正如上述所說,如今kafka斷定落後有兩種,副本滯後判斷依據是副本落後於leader最大消息數量(replica.lag.max.messages)或replicas響應partition leader的最長等待時間(replica.lag.time.max.ms)。前者是用來檢測緩慢的副本,然後者是用來檢測失效或死亡的副本cdn
若是ISR內的副本掛掉怎麼辦?
- 兩種選擇:服務直接不可用一段時間等待ISR中副本恢復(祈禱恢復的副本有數據吧) 或者 直接選用第一個副本(這個副本不必定在ISR中)做爲leader,這兩種方法也是在可用性和一致性之間的權衡。
- 服務不可用方式這種適用在不容許消息丟失的狀況下使用,適用於一致性大於可用性,能夠有兩種作法
- 設置ISR最小同步副本數量,若是ISR的當前數量大於設置的最小同步值,那麼該分區纔會接受寫入,避免了ISR同步副本過少。若是小於最小值那麼該分區將不接收寫入。這個最小值設置只有在acks = all的時候纔會生效。
- 禁用unclean-leader選舉,當isr中的全部副本所有不可用時,不可使用OSR 中的副本做爲leader,直接使服務不可用,直到等到ISR 中副本恢復再進行選舉leader。
- 直接選擇第一個副本做爲leader的方式,適用於可用性大於一致性的場景,這也是kafka在isr中全部副本都死亡了的狀況採用的默認處理方式,咱們能夠經過配置參數
unclean.leader.election.enable
來禁止這種行爲,採用第一種方法。
那麼ISR是如何實現同步的呢?
broker的offset大體分爲三種:base offset、high watemark(HW)、log end offset(LEO)blog
- base offset:起始位移,replica中第一天消息的offset
- HW:replica高水印值,副本中最新一條已提交消息的位移。leader 的HW值也就是實際已提交消息的範圍,每一個replica都有HW值,但僅僅leader中的HW才能做爲標示信息。什麼意思呢,就是說當按照參數標準成功完成消息備份(成功同步給follower replica後)纔會更新HW的值,表明消息理論上已經不會丟失,能夠認爲「已提交」。
- LEO:日誌末端位移,也就是replica中下一條待寫入消息的offset,注意哈,是下一條而且是待寫入的,並非最後一條。這個LEO我的感受也就是用來標示follower的同步進度的。 因此HW表明已經完成同步的數據的位置,LEO表明已經寫入的最新位置,只有HW位置以前的纔是能夠被外界訪問的數據。 如今就來看一下以前,broker從收到消息到返回響應這個黑盒子裏發生了什麼。
- broker 收到producer的請求
- leader 收到消息,併成功寫入,LEO 值+1
- broker 將消息推給follower replica,follower 成功寫入 LEO +1 …
- 全部LEO 寫入後,leader HW +1
- 消息可被消費,併成功響應
上述過程從下面的圖即可以看出: 索引
解決上一個問題後,接下來就是kafka如何選用leader呢?
選舉leader經常使用的方法是多數選舉法,好比Redis等,可是kafka沒有選用多數選舉法,kafka採用的是quorum(法定人數)。生命週期
quorum是一種在分佈式系統中經常使用的算法,主要用來經過數據冗餘來保證數據一致性的投票算法。在kafka中該算法的實現就是ISR,在ISR中就是能夠被選舉爲leader的法定人數。
- 在leader宕機後,只能從ISR列表中選取新的leader,不管ISR中哪一個副本被選爲新的leader,它都知道HW以前的數據,能夠保證在切換了leader後,消費者能夠繼續看到HW以前已經提交的數據。
- HW的截斷機制:選出了新的leader,而新的leader並不能保證已經徹底同步了以前leader的全部數據,只能保證HW以前的數據是同步過的,此時全部的follower都要將數據截斷到HW的位置,再和新的leader同步數據,來保證數據一致。 當宕機的leader恢復,發現新的leader中的數據和本身持有的數據不一致,此時宕機的leader會將本身的數據截斷到宕機以前的hw位置,而後同步新leader的數據。宕機的leader活過來也像follower同樣同步數據,來保證數據的一致性。
若是感受這篇文章對您有所幫助,請點擊一下喜歡或者關注博主,您的喜歡和關注將是我前進的最大動力!
refer: effectivecoding 官網 博客