coordinator來執行對於consumer group的管理,當consumer group的第一個consumer啓動的時候,它會去和kafka server肯定誰是它們組的coordinator。以後該group內的全部成員都會和該coordinator進行協調通訊算法
consumer group如何肯定本身的coordinator是誰呢, 消費者向kafka集羣中的任意一個broker發送一個GroupCoordinatorRequest請求,服務端會返回一個負載最小的broker節點的id,並將該broker設置爲coordinator.網絡
整個rebalance的過程分爲兩個步驟,Join和Syncspa
join: 表示加入到consumer group中,在這一步中,全部的成員都會向coordinator發送joinGroup的請求。一旦全部成員都發送了joinGroup請求,那麼coordinator會選擇一個consumer擔任leader角色,並把組成員信息和訂閱信息發送消費者
leader選舉算法比較簡單,若是消費組內沒有leader,那麼第一個加入消費組的消費者就是消費者leader,若是這個時候leader消費者退出了消費組,那麼從新選舉一個leader,這個選舉很隨意,相似於隨機算法server
protocol_metadata: 序列化後的消費者的訂閱信息
leader_id: 消費組中的消費者,coordinator會選擇一個座位leader,對應的就是member_id
member_metadata 對應消費者的訂閱信息
members:consumer group中所有的消費者的訂閱信息
generation_id: 年代信息,相似於以前講解zookeeper的時候的epoch是同樣的,對於每一輪rebalance,generation_id都會遞增。主要用來保護consumer group。隔離無效的offset提交。也就是上一輪的consumer成員沒法提交offset到新的consumer group中。blog
每一個消費者均可以設置本身的分區分配策略,對於消費組而言,會從各個消費者上報過來的分區分配策略中選舉一個彼此都贊同的策略來實現總體的分區分配,這個"贊同"的規則是,消費組內的各個消費者會經過投票來決定.kafka
在joingroup階段,每一個consumer都會把本身支持的分區分配策略發送到coordinator,coordinator手機到全部消費者的分配策略,組成一個候選集,每一個消費者須要從候選集裏找出一個本身支持的策略,而且爲這個策略投票
最終計算候選集中各個策略的選票數,票數最多的就是當前消費組的分配策略同步
完成分區分配以後,就進入了Synchronizing Group State階段,主要邏輯是向GroupCoordinator發送SyncGroupRequest請求,而且處理SyncGroupResponse響應,簡單來講,就是leader將消費者對應的partition分配方案同步給consumer group 中的全部consumerit
每一個消費者都會向coordinator發送syncgroup請求,不過只有leader節點會發送分配方案,其餘消費者只是打打醬油而已。當leader把方案發給coordinator之後,coordinator會把結果設置到SyncGroupResponse中。這樣全部成員都知道本身應該消費哪一個分區。
Ø consumer group的分區分配方案是在客戶端執行的!Kafka將這個權利下放給客戶端主要是由於這樣作能夠有更好的靈活性.io
咱們再來總結一下consumer group rebalance的過程集羣
Ø 發起join group請求,兩種狀況
注: 參照自咕泡mic