本篇文章討論的kafka版本是目前最新版 0.10.1.0。java
全部broker會經過ZooKeeper選舉出一個做爲KafkaController,來負責:apache
每一個分區能夠有多個副本,分散在不一樣的broker上。緩存
leader副本:被KafkaController選舉出來的,做爲該分區的leader服務器
其餘follower副本:其餘副本都做爲follower副本微信
isr列表:簡單描述就是,"跟得上"leader的副本列表(包含leader),最開始是全部副本。這裏的跟得上是指多線程
每個producer發送消息給某個分區的leader副本,其餘follower副本又會複製該消息。producer端有一個acks參數能夠設置:異步
同時對於isr列表的數量要求也有一個配置fetch
咱們本篇文章就重點經過kafka的原理來揭示在acks=-1的狀況下,哪些狀況下會丟失數據,或許能夠提一些改進措施來作到不丟失數據。線程
下面會先介紹下leader和follower副本複製的原理scala
leader副本的屬性
leader副本擁有其餘副本的記錄,保存着他們的以下屬性:
follower副本的屬性
其中follower會不斷的向leader發送fetch請求,若是沒有數據fetch則被leader阻塞一段時間,等待新數據的來臨,一旦來臨則解除阻塞,複製數據給follower。
咱們來看下當producer的acks=-1時,一次消息寫入的整個過程,上述是屬性是怎麼變化的
1.3.1 消息準備寫入leader副本,leader副本首先判斷當前isr列表是否小於min.insync.replicas,不小於才容許寫入。
若是不小於,leader寫入到本身的log中,獲得該消息的offset,而後對其餘follower的fetch請求解除阻塞,複製必定量的消息給follower
同時leader將本身最新的highWatermarkMetadata傳給follower
同時會判斷此次複製是否複製到leader副本的末尾了,即logEndOffsetMetadata位置,若是是的話,則更新上述的lastCaughtUpTimeMs
1.3.2 follower會將fetch來的數據寫入到本身的log中,本身的logEndOffsetMetadata獲得了更新,同時更新本身的highWatermarkMetadata,就是取leader傳來的highWatermarkMetadata和本身的logEndOffsetMetadata中的最小值
而後follower再一次向leader發送fetch請求,fetch的初始offset就是本身的logEndOffsetMetadata+1。
1.3.3 leader副本收到該fetch後,會更新leader副本中該follower的logEndOffsetMetadata爲上述fetch的offset,同時會對全部的isr列表的logEndOffsetMetadata排序獲得最小的logEndOffsetMetadata做爲最新的highWatermarkMetadata
若是highWatermarkMetadata已經大於了leader寫入該消息的offset了,說明該消息已經被isr列表都複製過了,則leader開始迴應producer
判斷當前isr列表的size是否小於min.insync.replicas,若是小於返回NotEnoughReplicasAfterAppendException異常,不小於則表明正常寫入了。
1.3.4 follower在下一次的fetch請求的響應中就會獲得leader最新的highWatermarkMetadata,更新本身的highWatermarkMetadata
若是某個broker掛了,leader副本在該broker上的分區就要從新進行leader選舉。來簡要描述下leader選舉的過程
1.4.1 KafkaController會監聽ZooKeeper的/brokers/ids節點路徑,一旦發現有broker掛了,執行下面的邏輯。這裏暫時先不考慮KafkaController所在broker掛了的狀況,KafkaController掛了,各個broker會從新leader選舉出新的KafkaController
1.4.2 leader副本在該broker上的分區就要從新進行leader選舉,目前的選舉策略是
1.4.2.1 優先從isr列表中選出第一個做爲leader副本
1.4.2.2 若是isr列表爲空,則查看該topic的unclean.leader.election.enable配置。
unclean.leader.election.enable:爲true則表明容許選用非isr列表的副本做爲leader,那麼此時就意味着數據可能丟失,爲false的話,則表示不容許,直接拋出NoReplicaOnlineException異常,形成leader副本選舉失敗。
1.4.2.3 若是上述配置爲true,則從其餘副本中選出一個做爲leader副本,而且isr列表只包含該leader副本。
一旦選舉成功,則將選舉後的leader和isr和其餘副本信息寫入到該分區的對應的zk路徑上。
1.4.3 KafkaController向上述相關的broker上發送LeaderAndIsr請求,將新分配的leader、isr、所有副本等信息傳給他們。同時將向全部的broker發送UpdateMetadata請求,更新每一個broker的緩存的metadata數據。
1.4.4 若是是leader副本,更新該分區的leader、isr、全部副本等信息。若是本身以前就是leader,則如今什麼操做都不用作。若是以前不是leader,則需將本身保存的全部follower副本的logEndOffsetMetadata設置爲UnknownOffsetMetadata,以後等待follower的fetch,就會進行更新
1.4.5 若是是follower副本,更新該分區的leader、isr、全部副本等信息
而後將日誌截斷到本身保存的highWatermarkMetadata位置,即日誌的logEndOffsetMetadata等於了highWatermarkMetadata
最後建立新的fetch請求線程,向新leader不斷髮送fetch請求,初次fetch的offset是logEndOffsetMetadata。
上述重點就是leader副本的日誌不作處理,而follower的日誌則須要截斷到highWatermarkMetadata位置。
至此,算是簡單描述了分區的基本狀況,下面就針對上述過程來討論下kafka分區的高可用和一致性問題。
哪些場景下會丟失消息?
acks= 0、1,很明顯都存在消息丟失的可能。
即便設置acks=-1,當isr列表爲空,若是unclean.leader.election.enable爲true,則會選擇其餘存活的副本做爲新的leader,也會存在消息丟失的問題。
即便設置acks=-1,當isr列表爲空,若是unclean.leader.election.enable爲false,則不會選擇其餘存活的副本做爲新的leader,即犧牲了可用性來防止上述消息丟失問題。
即便設置acks=-1,而且選出isr中的副本做爲leader的時候,仍然是會存在丟數據的狀況的:
s1 s2 s3是isr列表,還有其餘副本爲非isr列表,s1是leader,一旦某個日誌寫入到s1 s2 s3,則s1將highWatermarkMetadata提升,並回復了客戶端ok,可是s2 s3的highWatermarkMetadata可能還沒被更新,此時s1掛了,s2當選leader了,s2的日誌不變,可是s3就要截斷日誌了,這時已經回覆客戶端的日誌是沒有丟的,由於s2已經複製了。
可是若是此時s2一旦掛了,s3當選,則s3上就不存在上述日誌了(前面s2當選leader的時候s3已經將日誌截斷了),這時候就形成日誌丟失了。
其實咱們是但願上述最後一個場景可以作到不丟消息的,可是目前的作法仍是可能會丟消息的。
丟消息最主要的緣由是:
因爲follower的highWatermarkMetadata相對於leader的highWatermarkMetadata是延遲更新的,當leader選舉完成後,全部follower副本的截斷到本身的highWatermarkMetadata位置,則可能截斷了已被老leader提交了的日誌,這樣的話,這部分日誌僅僅存在新的leader副本中,在其餘副本中消失了,一旦leader副本掛了,這部分日誌就完全丟失了
這個截斷到highWatermarkMetadata的操做的確太狠了,可是它的用途有一個就是:避免了日誌的不一致的問題。經過每次leader選舉以後的日誌截斷,來達到和leader之間日誌的一致性,避免出現日誌錯亂的狀況。
ZooKeeper和Raft的實現也有相似的日誌複製的問題,那ZooKeeper和Raft的實現有沒有這種問題呢?他們是如何解決的呢?
Raft並不進行日誌的截斷操做,而是會經過每第二天志複製時的一致性檢查來進行日誌的糾正,達到和leader來保持一致的目的。不截斷日誌,那麼對於已經提交的日誌,則必然存在過半的機器上從而可以保證日誌基本是不會丟失的。
ZooKeeper只有當某個follower的記錄超出leader的部分纔會截斷,其餘的不會截斷的。選舉出來的leader是通過過半pk的,必然是包含所有已經被提交的日誌的,即便該leader掛了,再次從新選舉,因爲不進行日誌截斷,仍然是能夠選出其餘包含所有已提交的日誌的(有過半的機器都包含所有已提交的日誌)。ZooKeeper對於日誌的糾正則是在leader選舉完成後專門開啓一個糾正過程。
kafka的截斷到highWatermarkMetadata的確有點太粗暴了,若是不截斷日誌,則須要解決日誌錯亂的問題,即便不可以像ZooKeeper那樣花大代價專門開啓一個糾正過程,能夠像Raft那樣每次在fetch的時候能夠進行不斷的糾正。這一塊還有待繼續關注。
kafka目前是隻能保證一個分區內的數據是有序的。
可是你可能常常據說,一旦某個broker掛了,就可能產生亂序問題(也沒人指出亂序的緣由),是否正確呢?
首先來看看如何能保證單個分區內消息的有序性,有以下幾個過程:
3.1 producer按照消息的順序進行發送
不少時候爲了發送效率,採用的辦法是多線程、異步、批量發送。
若是爲了保證順序,則不能使用多線程來執行發送任務。
異步:通常是把消息先發到一個隊列中,由後臺線程不斷的執行發送任務。這種方式對消息的順序也是有影響的:
如先發送消息1,後發送消息2,此時服務器端在處理消息1的時候返回了異常,可能在處理消息2的時候成功了,此時若再重試消息1就會形成消息亂序的問題。因此producer端須要先確認消息1發送成功了才能執行消息2的發送。
對於kafka來講,目前是異步、批量發送。解決異步的上述問題就是配置以下屬性:
max.in.flight.requests.per.connection=1
即producer發現一旦還有未確認發送成功的消息,則後面的消息不容許發送。
3.2 相同key的消息可以hash到相同的分區
正常狀況下是沒問題的,可是一旦某個分區掛了,如本來總共4個分區,此時只有3個分區存活,在此分區恢復的這段時間內,是否會存在hash錯亂到別的分區?
那就要看producer端獲取的metadata信息是否會立馬更新成3個分區。目前看來應該是不會的
producer見到的metadata數據是各個broker上的緩存數據,這些緩存數據是由KafkaController來統一進行更新的。一旦leader副本掛了,KafkaController並不會去立馬更新成3個分區,而是去執行leader選舉,選舉完成後纔會去更新metadata數據,此時選舉完成後仍然是保證4個分區的,也就是說producer是不可能獲取到只有3個分區的metadata數據的,因此producer端仍是能正常hash的,不會錯亂分區的。
在整個leader選舉恢復過程,producer最可能是沒法寫入數據(後期能夠重試)。
3.3 系統對順序消息的支持
leader副本按照消息到來的前後順序寫入本地日誌的,一旦寫入日誌後,該順序就肯定了,follower副本也是按照該順序進行復制的。對於消息的提交也是按照消息的offset從低到高來確認提交的,因此說kafka對於消息的處理是順序的。
3.4 consumer可以按照消息的順序進行消費
爲了接收的效率,可能會使用多線程進行消費。這裏爲了保證順序就只能使用單線程來進行消費了。
目前kafka的Consumer有scala版本的和java版本的(這一塊以後再詳細探討),最新的java版本,對用戶提供一個poll方法,用戶本身去決定是使用多線程仍是單線程。
如何看待kafka的isr列表設計?和過半怎麼對比呢?
對於相同數量的2n個follower加一個leader,過半呢則容許n個follower掛掉,而isr呢則容許2n個follower掛掉(可是會存在丟失消息的問題),因此過半更多會犧牲可用性(掛掉一半以上就不可用了)來加強數據的一致性,而isr會犧牲一致性來加強可用性(掛掉一半以上扔可以使用,可是存在丟數據的問題)
可是在確認效率上:過半僅僅須要最快的n+1的寫入成功便可斷定爲成功,而isr則須要2n+1的寫入成功纔算成功。同時isr是動態變化的過程,一旦跟不上或者跟上了都會離開或者加入isr列表。isr列表越小寫入速度就會加快。
有哪些環節會形成消息的重複消費?若是避免不了,如何去減小重複?
producer端重複發送
producer端因發送超時等等緣由作重試操做,目前broker端作重複請求的判斷仍是很難的,目前kafka也沒有去作,而是存儲完消息以後,若是開啓了Log compaction,它會經過kafka消息中的key來斷定是不是重複消息,是的話則會刪除。
consumer消費後,未及時提交消費的offset便掛了,下次恢復後就會重複消費
這個目前來講並無通用的解決辦法,先消費後提交offset可能會重複,先提交offset後消費可能形成消息丟失,因此通常仍是優先保證消息不丟,在業務上去作去重判斷。
本文章涉及到的話題不少,不免有疏漏的地方,還請批評指正。
歡迎繼續來討論,越辯越清晰。
歡迎關注微信公衆號:乒乓狂魔