Controller選舉
當添加一個分區或分區增長副本的時候,都要從全部副本中選舉一個新的Leader出來。html
Leader若是選舉?投票怎麼玩?是否是全部的partition副本直接發起投票,開始競選呢?好比用ZK實現。算法
利用ZK如何實現選舉?ZK的什麼功能能夠感知到節點的變化(增長或減小)?或者說ZK爲何能實現加鎖和釋放鎖?apache
用到了3個特色:watch機制;節點不容許重複寫入;臨時節點。網絡
這樣實現是比較簡單,但也會存在必定弊端。若是分區和副本數量過多,全部的副本都直接選舉的話,一旦某個節點增減,就會形成大量watch事件被觸發,ZK的負載就會太重。架構
kafka早期的版本就是這樣作的,後來換了一種實現方式。分佈式
不是全部的repalica都參與leader選舉,而是由其中的一個Broker統一來指揮,這個Broker的角色就叫作Controller(控制器)。fetch
就像Redis Sentinel的架構,執行故障轉移的時候,必需要先從全部哨兵中選一個負責故障轉移的節點同樣。kafka 也要先從全部Broker中選出惟一的一個Controller。url
全部Broker會嘗試在zookeeper中建立臨時節點/controller,只有一個能建立成功(先到先得)。.net
若是Controller掛掉了或者網絡出現了問題,ZK上的臨時節點會消失。其餘的Brokder經過watch監聽到Controller下線的消息後,開始競選新的Controller。方法跟以前仍是同樣的,誰先在ZK裏寫入一個/cotroller節點,誰就成爲新的Controller。設計
成爲Controller節點以後,它的責任也比其餘節點重了幾分:
- 監聽Broker變化
- 監聽Topic變化
- 監聽Partition變化
- 獲取和管理Broker、Topic、Partition的信息
- 管理Partiontion的主從信息
分區副本Leader選舉
Controller肯定之後,就能夠開始作分區選主的事了。下面就是找候選人了。顯然,每一個replica都想推薦本身,但全部的replica都有競選資格嗎?並非,這裏有幾個概念。
Assigned-Replicas(AR):一個分區的全部副本。 In-Sync Replicas(ISR):上邊全部副本中,跟leader數據保持必定程度同步的。 Out-Sync Replicas(OSR):跟leader同步滯後過多的副本。
AR=ISR + OSR。正常狀況下OSR是空的,你們正常同步,AR=ISR。
誰能參加選舉?確定不是AR,也不是OR,而是ISR。並且這個ISR不是固定不變的,仍是一個動態列表。
前面說過,若是同步延遲超30秒,就踢出ISR,進入OSR;若是遇上來了就加入ISR。
默認狀況下,當leader副本發生故障時,只有在ISR集合中的副本纔有資格被選舉爲新的leader。
若是ISR爲空呢?羣龍不能無首。在這種狀況下,可讓ISR以外的副本參與選舉。容許ISR以外的副本參與選舉,叫作unclean leader election。
unclean.leader.election.enable=false
把這個參數改爲true(通常不建議開啓,會形成數據丟失)。
Controller有了,候選人也有了ISR,那麼根據什麼規則肯定leader呢?
咱們首先來看分佈式系統中常見的選舉協議有哪些(或者說共識算法)?
ZAB(ZK)、Raft(Redis Sentinel)他們都是Paxos算法的變種,核心思想概括起來都是:先到先得、少數服從多數。
但kafka沒有用這些方法,而是用了一種本身實現的算法。
爲何呢?好比ZAB這種協議,可能會出現腦裂(節點不能互通的時候,出現多個leader)、驚羣效應(大量watch事件被觸發)。
在文檔中有說明:
https://kafka.apachecn.org/documentation.html#design_replicatedlog
提到kafka的選舉實現,最相近的是微軟的PacificA算法。
在這種算法中,默認是讓ISR中第一個replica變成leader。像中國皇帝傳位同樣,優先傳給皇長子。
主從同步
leader肯定以後,客戶端的讀寫只能操做leader節點。follower須要向leader同步數據。
不一樣的raplica的offset是不同的,同步到底怎麼同步呢?
在以後內容,須要先理解幾個概念。
LEO(Log End Offset):下一條等待寫入的消息的offset(最新的offset + 1)。
HW(Hign Watermark 高水位):ISR中最小的LEO。Leader會管理全部ISR中最小的LEO爲HW。
consumer最多隻能消費到HW以前的位置。也就是說,其餘副本沒有同步過去的消息,是不能被消費的。
kafka爲何這麼設計?
若是在同步成功以前就被消費了,consumer group 的offset會偏大,若是leader崩潰,中間會丟失消息。
接着再看消息是如何同步的。
Replica 1與Replica2各同步了1條數據,HW推動了1,變成了7,LEO因Replica2推動了1,變成了7。
Replica 1與Replica2各同步了2條數據,HW和LEO重疊,都到了9。
在這須要瞭解一下,從節點如何與主節點保持同步?
- follower節點會向Leader發送一個fetch請求,leader向follower發送數據後,即須要更新follower的LEO。
- follower接收到數據響應後,依次寫入消息而且更新LEO。
- leader更新HW(ISR最小的LEO)
kafka設計了獨特的ISR複製,能夠在保障數據一致性狀況下又能夠提供高吞吐量。
Replica故障處理
follower故障
首先follower發生鼓掌,會被先踢出ISR。
follower恢復以後,從哪開始同步數據呢?
假設Replica1宕機。
恢復之後,首先根據以前的記錄的HW(6),把高於HW的消息截掉(六、7)。
而後向Leader同步消息。追上Leader以後(30秒),從新加入ISR。
leader故障
還以上圖爲例,若是圖中Leader發生故障。
首先選一個Leader,由於Replica1優先,它將成爲Leader。
爲了保證數據一致,其餘follower須要把高於HW的消息截掉(這裏沒有消息須要截取)。
而後Replica2同步數據。
此時原Leader中的數據8將丟失。
注意:這種機制只能保證副本之間的數據一致性,並不能保證數據不丟失或者不重複。