kafka協調者

咱們先假設初始時世界是混沌的尚未盤古的開天闢地,協調者也是一片荒蕪人煙之地,沒有保存任何狀態,由於消費組的初始狀態是Stable,在第一次的Rebalance時,正常的尚未向消費組註冊過的消費者會執行狀態爲Stable並且memberId=UNKNOWN_MEMBER_ID條件分支。在第一次Rebalance以後,每一個消費者都分配到了一個成員編號,系統又會進入Stable穩定狀態(Stable穩定狀態包括兩種:一種是沒有任何消費者的穩定狀態,一種是有消費者的穩定狀態)。由於全部消費者在執行一次JoinGroup後並非說系統就一直保持這種不變的狀態,有可能由於這樣或那樣的事件致使消費者要從新進行JoinGroup,這個時候由於以前JoinGroup過了每一個消費者都是有成員編號的,處理方式確定是不同的。算法

因此定義一種事件驅動的狀態機就頗有必要了,這世界看起來是雜亂無章的,不過只要遵循着狀態機的規則(萬物生長的理論),任何事件都是有跡可循有路可走有條不紊地進行着。session

 

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,
    clientHost: String,sessionTimeoutMs: Int,protocolType: String,
    protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {
  if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {
    //protocolType對於消費者是consumer,注意這裏的協議類型和PartitionAssignor協議不一樣哦
    //協議類型目前總共就兩種消費者和Worker,而協議是PartitionAssignor分配算法
    responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
    //若是當前組沒有記錄該消費者,而該消費者卻被分配了成員編號,則重置爲未知成員,並讓消費者重試
    responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  } else { group.currentState match {
    case Dead =>
      responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
    case PreparingRebalance =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二個消費者在這裏了!
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else {
        val member = group.get(memberId)
        updateMemberAndRebalance(group, member, protocols, responseCallback)
      }
    case Stable =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {  //1.初始時第一個消費者在這裏!
        //若是消費者成員編號是未知的,則向GroupMetadata註冊並被記錄下來
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else { //3.第二次Rebalance時第一個消費者在這裏,此時要分Leader仍是普通的消費者了
        val member = group.get(memberId)
        if (memberId == group.leaderId || !member.matches(protocols)) {
          updateMemberAndRebalance(group, member, protocols, responseCallback)
        } else {
          responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,
            generationId = group.generationId,subProtocol = group.protocol,
            leaderId = group.leaderId,errorCode = Errors.NONE.code))
        }
      }
    }
    if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  }
}

addMemberAndRebalance和updateMemberAndRebalance會建立或更新MemberMetadata,而且會嘗試調用prepareRebalance,消費組中只有一個消費者有機會調用prepareRebalance,而且一旦調用該方法,會將消費組狀態更改成PreparingRebalance,就會使得下一個消費者只能從case PreparingRebalance入口進去了,假設第一個消費者是從Stable進入的,它更改了狀態爲PreparingRebalance,下一個消費者就不會從Stable進來的。不過進入Stable狀態還要判斷消費者是否是已經有了成員編號,一般是以前已經發生了Rebalance,這種影響也是比較巨大的,每一個消費者走的路徑跟第一次的Rebalance是徹底不一樣的迷宮地圖了。scala

1)第一次Rebalance如圖6-18的上半部分:code

  1. 第一個消費者,狀態爲Stable,沒有編號,addMemberAndRebalance,成爲Leader,執行prepareRebalance,更改狀態爲PreparingRebalance,建立DelayedJoin
  2. 第二個消費者,狀態爲PreparingRebalance,沒有編號,addMemberAndRebalance(不執行prepareRebalance,由於在狀態改變成PreparingRebalance後就不會被執行了);後面的消費者同第二個
  3. 全部消費者都要等協調者收集完全部成員編號在DelayedJoin完成時纔會收到JoinGroup響應

 

圖6-18 第一次和第二次Rebalance事件

2)第二次Rebalance,對於以前加入過的消費者都要成員編號如圖6-18的下半部分:get

  1. 第一個消費者是Leader,狀態爲Stable,有編號,updateMemberAndRebalance,更改狀態爲PreparingRebalance,建立DelayedJoin
  2. 第二個消費者,狀態爲PreparingRebalance,有編號,updateMemberAndRebalance;後面的消費者同第二個
  3. 全部消費者也要等待,由於其餘消費者發送Join請求在Leader消費者以後。

3)不過若是有消費者在Leader以前發送又有點不同了如圖6-19:it

  1. 第一個消費者不是Leader,狀態爲Stable,有編號,responseCallback,當即收到JoinGroup響應,好幸運啊!
  2. 第二個消費者若是也不是Leader,恭喜你,協調者也放過他,直接返回JoinGroup響應
  3. 第三個消費者是Leader(領導來了),狀態爲Stable(什麼,大家以前的消費者居然都沒更新狀態!,由於他們都沒有add或update),有編號,updateMemberAndRebalance(仍是我第一個調用add或update,看來仍是隻能我來更新狀態),更改狀態爲PreparingRebalance,建立DelayedJoin
  4. 第四個消費者不是Leader,狀態爲PreparingRebalance,有編號,updateMemberAndRebalance(前面有領導,很差意思了,不能當即返回JoinGroup給你了,大家這些剩下的消費者都只能和領導一塊兒返回了,算大家倒黴)

 

圖6-19 Leader非第一個發送JoinGroup請求io

4)若是第一個消費者不是Leader,也沒有編號,說明這是一個新增的消費者,流程又不一樣了如圖6-20:table

  1. 第一個消費者不是Leader,狀態爲Stable,沒有編號,addMemberAndRebalance,執行prepareRebalance(我是第一個調用add或update的哦,大家都別想跟我搶這個頭彩了),更改狀態爲PreparingRebalance(我不是Leader但我驕傲啊),建立DelayedJoin(我搶到頭彩,固然建立DelayedJoin的工做只能由我來完成了)
  2. 第二個消費者也不是Leader,恭喜你,協調者也放過他,直接返回JoinGroup響應
  3. 第三個消費者是Leader(領導來了),狀態爲PreparingRebalance(有個新來的不懂規矩,他已經把狀態改了),有編號,updateMemberAndRebalance(有人已經改了,你老就不用費心思了),凡是沒有當即返回響應的,都須要等待,領導也不例外
  4. 第四個消費者不是Leader(廢話,只有一個領導,並且領導已經在前面了),不會當即返回響應(你看領導都排隊呢)
  5. 雖然DelayedJoin是由沒有編號的消費者建立,不過因爲DelayedJoin是以消費組爲級別的,因此不用擔憂,上一次選舉出來的領導仍是領導,協調者最終仍是會把members交給領導,不會是給那個沒有編號的消費者的,雖說在他註冊的時候已經有編號了,可是你們不認啊。不過領導其實不在乎是誰開始觸發prepareRebalance的,那我的要負責生成DelayedJoin,而不論是領導本身仍是其餘人一旦更改狀態爲PreparingRebalance,後面的消費者都要等待DelayedJoin完成了,而領導者老是要等待的,因此他固然無所謂了,由於他知道最後協調者老是會把members交給他的。

 

圖6-20 新增消費組第一個發送JoinGroup請求class

根據上面的幾種場景總結下來狀態機的規則和一些結論以下:

  1. 第一個調用addMemberAndRebalance或者updateMemberAndRebalance的會將狀態改成PreparingRebalance,而且負責生成DelayedJoin
  2. 一旦狀態進入PreparingRebalance,其餘消費者就只能從PreparingRebalance狀態入口進入,這裏只有兩種選擇addMemberAndRebalance或者updateMemberAndRebalance,不過他們不會更改狀態,也不會生成DelayedJoin
  3. 發生DelayedJoin以後,其餘消費者的JoinGroup響應都會被延遲,由於如規則2中,他們只能調用add或update,沒法當即調用responseCallback,因此就要和DelayedJoin的那個消費者一塊兒等待
  4. 正常流程時,發生responseCallback的是存在成員編號的消費者在Leader以前發送了JoinGroup,或者新增長的消費者發送了JoinGroup請求以前
  5. 第一次Rebalance時,第一個消費者會建立DelayedJoin,以後的Rebalance,只有新增的消費者纔有機會建立(若是他在Leader以前發送的話,若是在Leader以後就沒有機會了),而普通消費者老是沒有機會建立DelayedJoin的,由於狀態爲Stable時,他會直接開溜,有人(Leader或者新增長的消費者)建立了DelayedJoin以後,他又在那邊怨天尤人只能等待
相關文章
相關標籤/搜索