Kafka消費者組靜態成員(static consumer member)

Kafka 2.3發佈後官網的Consumer參數中增長了一個新的參數:group.instance.id。下面是這個參數的解釋:html

A unique identifier of the consumer instance provided by end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.session

大體意思是:它是用戶指定的一個consumer成員ID。每一個消費者組下這些ID必須是惟一的。一旦設置了該ID,該消費者就會被視爲是一個靜態成員(Static Member)。靜態成員配以較大的session超時設置可以避免因成員臨時不可用(好比重啓)而引起的Rebalance。因而可知,消費者組靜態成員是2.3版本新引入的一個概念,主要是爲了不沒必要要的Rebalance。ide

 Rebalance Recap

以前咱們在Kafka消費者組一文中討論過Rebalance機制。它的主要做用是爲消費者組下全部成員分配分區。Client端和Broker端須要同時參與到Rebalance過程。在Broker端,Coordinator組件負責處理成員管理,好比處理組成員發送的JoinGroup請求、SyncGroup請求、Heartbeat請求和LeaveGroup請求;在Client端,Leader Consumer成員接收Coordinator發送的成員訂閱信息,而後根據必定的策略(Range/Round-Robin/Sticky/自定義)制定分配方案。oop

Rebalance發生的條件有三個:this

  • 成員數量發生變化,即有新成員加入或現有成員離組(包括主動離組和崩潰被動離組)
  • 訂閱主題數量發生變化
  • 訂閱主題分區數量發生變化

其實,後兩個條件能夠合併成一個,即Rebalance觸發條件只有兩個:1. 成員數量發生變化;2. 訂閱信息發生變化。spa

Rebalance的流程在那篇文章中也談到了:首先,各個成員發送JoinGroup請求入組,Coordinator會等待一段時間等它們加入——這段時間由全部成員中max.poll.interval.ms的最大值來決定(在Kafka Connect中則是有專屬的參數rebalance.timeout.ms來指定)。以後各成員發送SyncGroup請求等待Coordinator發送分配方案,而後開始正常消費。在消費的同時,各個consumer還會按期(heartbeat.interval.ms)上報心跳,告訴Coordinator組件它還活着。設計

 Issues for Rebalance

在實際場景中,由於成員離組而發生的Rebalance應該算是最多的,但有些場景下的Rebalance是很是不合理的。好比咱們公司就有這樣的痛點:Consumer的處理邏輯發生變動,必需要更新代碼從新上線,此時就要引起Rebalance,但其實重啓Consumer也許只須要幾分鐘而已,也就是說個人消費只要中斷幾分鐘就能夠了,Kafka徹底不必爲這個就觸發一輪Rebalance,更沒有必要從新分配分區,維持以前的分配方案足矣。雖然社區提供的Sticky分配方案在必定程度上可以緩解此問題,但Rebalance的Stop The World(STW)的特性仍是決定了生產環境中Rebalance越少越好。rest

 Static Member

在目前的Rebalance設計中,消費者組下的每一個實例都會被Coordinator分配一個成員ID,即member.id。不少Kafka用戶都有過這樣的疑問:我能手動設置這個member.id嗎?很遺憾,這個memberID是Kafka自動生成的,在靜態成員被引入前,規則是client.id-UUID,這裏的client.id就是Consumer端參數client.id的值,並且這個ID會隨着每輪Rebalance發生變化的。換句話說,Coordinator沒法持久化地保存某個consumer實例的member.id。我想這多是制約Rebalance時全部成員必須強制從新加入的部分緣由,由於Coordinator沒法記住每一個成員都是誰。若是你看源代碼,能夠發如今每次Client重啓回來發送JoinGroup時,它會封裝一個UNKNOWN_MEMBER_ID的空串,沒有任何有意義的信息給到Broker端。Coordinator接收到後只能把它當作是一個全新的成員。相反地,若是member.id可以被記住,那麼Coordinator就能夠容忍它短暫的離線而不開啓Rebalance,從而縮短消費者組總體不可用的時間窗口。code

爲此,社區於2.3和2.4版本引入了靜態成員(Static Member)的概念以及一個新的Consumer端參數:group.instance.id。一旦配置了該參數,成員將自動成爲靜態成員,不然的話和之前同樣依然被視爲是動態成員。你能夠認爲這個新參數是一個要被持久化的新member.id。它依然不能由用戶指定,構建規則是`group.instsance.id`-UUID。和member.id不一樣的是,每次成員重啓回來後,其靜態成員ID值是不變的,所以以前分配給該成員的全部分區也是不變的,並且在沒有超時前靜態成員重啓回來是不會觸發Rebalance的。htm

 靜態成員Rebalance條件

 顯然,靜態成員觸發Rebalance的難度要小於動態成員。若是使用了靜態成員,如今觸發Rebalance的條件變動爲:

  • 新成員加入組:這個條件依然不變。當有新成員加入時確定會觸發Rebalance從新分配分區
  • Leader成員從新加入組:好比主題分配方案發生變動
  • 現有成員離組時間超過了session超時時間:即便它是靜態成員,Coordinator也不會無限期地等待它。一旦超過了session超時時間依然會觸發Rebalance
  • Coordinator接收到LeaveGroup請求:成員主動通知Coordinator永久離組。畢竟Kafka仍是要提供方法讓一個成員可以永遠地退出組,此時重啓Rebalance仍是必要的

 請求協議變動

 爲了支持group.instance.id,與消費者組相關的協議格式也要作對應的變化。我看了下官網,JoinGroup、SyncGroup、LeaveGroup和OffsetCommit請求的協議格式都作了相應的變動。好比JoinGroup請求的Request和Response格式都增長了group-instance-id字段,以下所示:

JoinGroup Request (Version: 5) => group_id session_timeout_ms rebalance_timeout_ms member_id group_instance_id protocol_type [protocols]
  group_id => STRING
  session_timeout_ms => INT32
  rebalance_timeout_ms => INT32
  member_id => STRING
  group_instance_id => NULLABLE_STRING
  protocol_type => STRING
  protocols => name metadata
  name => STRING
  metadata => BYTES

JoinGroup Response (Version: 5) => throttle_time_ms error_code generation_id protocol_name leader member_id [members]
  throttle_time_ms => INT32
  error_code => INT16
  generation_id => INT32
  protocol_name => STRING
  leader => STRING
  member_id => STRING
  members => member_id group_instance_id metadata
  member_id => STRING
  group_instance_id => NULLABLE_STRING
  metadata => BYTES

 其餘請求格式的變動也是相似的,這裏就不貼了。

 其餘變動

鑑於目前靜態成員短暫重啓或不可用不會觸發Rebalance的改動,社區對消費者組最大session過時時間也作了修改。以前Consumer端參數group.min.session.timeout.ms值是6秒——要想在這個時間內重啓完一個應用一般都是很困難的,所以社區如今將該值默認值改成30分鐘。這就是說,只要配置有靜態成員的Consumer程序代碼更新及重啓在30分鐘以內完成,Consumer Group就不會發生Rebalance。固然在這段時間內,該Consumer的消費進度會中斷,可是分區分配方案不會發生變化。

 總結

目前靜態成員的部分功能已經集成進Kafka 2.3版本,還有一部分功能正在開發中,將來會進到2.4版本中。從目前的設計來看,靜態成員機制可以幫助咱們規避不少線上環境中本沒必要要的Rebalance,應該說是個很使人期待的新特性。同時,社區針對Rebalance的Stop The World醞釀一次大的修正,即所謂的增量協同式Rebalance(Incremental Cooperative Rebalance)。大體思想是容許單個consumer實例自行採用增量或漸進式的方式進行Rebalance,避免全局的STW。相關的代碼正在開發中,後續我也會帶來這方面的功能介紹。

相關文章
相關標籤/搜索