kafka-Reblance

誰來執行Rebalance以及管理consumer的group呢

coordinator來執行對於consumer group的管理,當consumer group的第一個consumer啓動的時候,它會去和kafka server肯定誰是它們組的coordinator。以後該group內的全部成員都會和該coordinator進行協調通訊算法

如何肯定coordinator?

consumer group如何肯定本身的coordinator是誰呢, 消費者向kafka集羣中的任意一個broker發送一個GroupCoordinatorRequest請求,服務端會返回一個負載最小的broker節點的id,並將該broker設置爲coordinator.網絡

JoinGroup的過程

整個rebalance的過程分爲兩個步驟,Join和Syncspa

join: 表示加入到consumer group中,在這一步中,全部的成員都會向coordinator發送joinGroup的請求。一旦全部成員都發送了joinGroup請求,那麼coordinator會選擇一個consumer擔任leader角色,並把組成員信息和訂閱信息發送消費者
leader選舉算法比較簡單,若是消費組內沒有leader,那麼第一個加入消費組的消費者就是消費者leader,若是這個時候leader消費者退出了消費組,那麼從新選舉一個leader,這個選舉很隨意,相似於隨機算法server

 

protocol_metadata: 序列化後的消費者的訂閱信息
leader_id: 消費組中的消費者,coordinator會選擇一個座位leader,對應的就是member_id
member_metadata 對應消費者的訂閱信息
members:consumer group中所有的消費者的訂閱信息
generation_id: 年代信息,相似於以前講解zookeeper的時候的epoch是同樣的,對於每一輪rebalance,generation_id都會遞增。主要用來保護consumer group。隔離無效的offset提交。也就是上一輪的consumer成員沒法提交offset到新的consumer group中。blog

 

肯定分區分配策略

每一個消費者均可以設置本身的分區分配策略,對於消費組而言,會從各個消費者上報過來的分區分配策略中選舉一個彼此都贊同的策略來實現總體的分區分配,這個"贊同"的規則是,消費組內的各個消費者會經過投票來決定.kafka

在joingroup階段,每一個consumer都會把本身支持的分區分配策略發送到coordinator,coordinator手機到全部消費者的分配策略,組成一個候選集,每一個消費者須要從候選集裏找出一個本身支持的策略,而且爲這個策略投票
最終計算候選集中各個策略的選票數,票數最多的就是當前消費組的分配策略同步

 

Synchronizing Group State階段

完成分區分配以後,就進入了Synchronizing Group State階段,主要邏輯是向GroupCoordinator發送SyncGroupRequest請求,而且處理SyncGroupResponse響應,簡單來講,就是leader將消費者對應的partition分配方案同步給consumer group 中的全部consumerit

 

 

 

 

每一個消費者都會向coordinator發送syncgroup請求,不過只有leader節點會發送分配方案,其餘消費者只是打打醬油而已。當leader把方案發給coordinator之後,coordinator會把結果設置到SyncGroupResponse中。這樣全部成員都知道本身應該消費哪一個分區。
Ø consumer group的分區分配方案是在客戶端執行的!Kafka將這個權利下放給客戶端主要是由於這樣作能夠有更好的靈活性.io

 

總結


咱們再來總結一下consumer group rebalance的過程集羣

 

  1. Ø 對於每一個consumer group子集,都會在服務端對應一個GroupCoordinator進行管理,GroupCoordinator會在zookeeper上添加watcher,當消費者加入或者退出consumer group時,會修改zookeeper上保存的數據,從而觸發GroupCoordinator開始Rebalance操做
  2. Ø 當消費者準備加入某個Consumer group或者GroupCoordinator發生故障轉移時,消費者並不知道GroupCoordinator的在網絡中的位置,這個時候就須要肯定GroupCoordinator,消費者會向集羣中的任意一個Broker節點發送ConsumerMetadataRequest請求,收到請求的broker會返回一個response做爲響應,其中包含管理當前ConsumerGroup的GroupCoordinator,
  3. Ø 消費者會根據broker的返回信息,鏈接到groupCoordinator,而且發送HeartbeatRequest,發送心跳的目的是要要奧噶蘇GroupCoordinator這個消費者是正常在線的。當消費者在指定時間內沒有發送心跳請求,則GroupCoordinator會觸發Rebalance操做。

Ø 發起join group請求,兩種狀況

  1. 若是GroupCoordinator返回的心跳包數據包含異常,說明GroupCoordinator由於前面說的幾種狀況致使了Rebalance操做,那這個時候,consumer會發起join group請求
  2. 新加入到consumer group的consumer肯定好了GroupCoordinator之後,消費者會向GroupCoordinator發起join group請求,
  3. GroupCoordinator會收集所有消費者信息以後,來確承認用的消費者,並從中選取一個消費者成爲group_leader。並把相應的信息(分區分配策略、leader_id、…)封裝成response返回給全部消費者,可是隻有group leader會收到當前consumer group中的全部消費者信息
  4. 當消費者肯定本身是group leader之後,會根據消費者的信息以及選定分區分配策略進行分區分配接着進入Synchronizing Group State階段,
  5. 每一個消費者會發送SyncGroupRequest請求到GroupCoordinator,可是隻有Group Leader的請求會存在分區分配結果(Leader負責根據分區分配規則進行分區分配),GroupCoordinator會根據Group Leader的分區分配結果造成SyncGroupResponse返回給全部的Consumer。
  6. consumer根據分配結果,執行相應的操做

 

 

注: 參照自咕泡mic 

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息