咱們先假設初始時世界是混沌的尚未盤古的開天闢地,協調者也是一片荒蕪人煙之地,沒有保存任何狀態,由於消費組的初始狀態是Stable,在第一次的Rebalance時,正常的尚未向消費組註冊過的消費者會執行狀態爲Stable
並且memberId=UNKNOWN_MEMBER_ID
條件分支。在第一次Rebalance以後,每一個消費者都分配到了一個成員編號,系統又會進入Stable穩定狀態(Stable穩定狀態包括兩種:一種是沒有任何消費者的穩定狀態,一種是有消費者的穩定狀態)。由於全部消費者在執行一次JoinGroup後並非說系統就一直保持這種不變的狀態,有可能由於這樣或那樣的事件致使消費者要從新進行JoinGroup,這個時候由於以前JoinGroup過了每一個消費者都是有成員編號的,處理方式確定是不同的。算法
因此定義一種事件驅動的狀態機就頗有必要了,這世界看起來是雜亂無章的,不過只要遵循着狀態機的規則(萬物生長的理論),任何事件都是有跡可循有路可走有條不紊地進行着。session
private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String, clientHost: String,sessionTimeoutMs: Int,protocolType: String, protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) { if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) { //protocolType對於消費者是consumer,注意這裏的協議類型和PartitionAssignor協議不一樣哦 //協議類型目前總共就兩種消費者和Worker,而協議是PartitionAssignor分配算法 responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code)) } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { //若是當前組沒有記錄該消費者,而該消費者卻被分配了成員編號,則重置爲未知成員,並讓消費者重試 responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) } else { group.currentState match { case Dead => responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) case PreparingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二個消費者在這裏了! addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else { val member = group.get(memberId) updateMemberAndRebalance(group, member, protocols, responseCallback) } case Stable => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //1.初始時第一個消費者在這裏! //若是消費者成員編號是未知的,則向GroupMetadata註冊並被記錄下來 addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else { //3.第二次Rebalance時第一個消費者在這裏,此時要分Leader仍是普通的消費者了 val member = group.get(memberId) if (memberId == group.leaderId || !member.matches(protocols)) { updateMemberAndRebalance(group, member, protocols, responseCallback) } else { responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId, generationId = group.generationId,subProtocol = group.protocol, leaderId = group.leaderId,errorCode = Errors.NONE.code)) } } } if (group.is(PreparingRebalance)) joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } }
addMemberAndRebalance和updateMemberAndRebalance會建立或更新MemberMetadata,而且會嘗試調用prepareRebalance
,消費組中只有一個消費者有機會調用prepareRebalance,而且一旦調用該方法,會將消費組狀態更改成PreparingRebalance
,就會使得下一個消費者只能從case PreparingRebalance
入口進去了,假設第一個消費者是從Stable進入的,它更改了狀態爲PreparingRebalance,下一個消費者就不會從Stable進來的。不過進入Stable狀態還要判斷消費者是否是已經有了成員編號,一般是以前已經發生了Rebalance,這種影響也是比較巨大的,每一個消費者走的路徑跟第一次的Rebalance是徹底不一樣的迷宮地圖了。scala
1)第一次Rebalance如圖6-18的上半部分:code
圖6-18 第一次和第二次Rebalance事件
2)第二次Rebalance,對於以前加入過的消費者都要成員編號如圖6-18的下半部分:get
3)不過若是有消費者在Leader以前發送又有點不同了如圖6-19:it
圖6-19 Leader非第一個發送JoinGroup請求io
4)若是第一個消費者不是Leader,也沒有編號,說明這是一個新增的消費者,流程又不一樣了如圖6-20:table
圖6-20 新增消費組第一個發送JoinGroup請求class
根據上面的幾種場景總結下來狀態機的規則和一些結論以下: