Kafka消費組(consumer group)

1、 誤區澄清與概念明確

1 Kafka的版本html

不少人在Kafka中國社區(替羣主作個宣傳,QQ號:162272557)提問時的開頭常常是這樣的:「我使用的kafka版本是2.10/2.11, 如今碰到一個奇怪的問題。。。。」 無心冒犯,但這裏的2.10/2.11不是kafka的版本,而是編譯kafka的Scala版本。Kafka的server端代碼是由Scala語言編寫的,目前Scala主流的3個版本分別是2.十、2.11和2.12。實際上Kafka如今每一個PULL request都已經自動增長了這三個版本的檢查。下圖是個人一個PULL request,能夠看到這個fix會同時使用3個scala版本作編譯檢查:java

目前普遍使用kafka的版本應該是這三個大版本:0.8.x, 0.9.x和0.10.* 。 這三個版本對於consumer和consumer group來講都有很大的變化,咱們後面會詳談。git

2 新版本 VS 老版本github

「個人kafkaoffsetmonitor爲何沒法監控到offset了?」——這是我在Kafka中國社區見到最多的問題,沒有之一!實際上,Kafka 0.9開始提供了新版本的consumer及consumer group,位移的管理與保存機制發生了很大的變化——新版本consumer默認將再也不保存位移到zookeeper中,而目前kafkaoffsetmonitor尚未應對這種變化(雖然已經有不少人在要求他們改了,詳見https://github.com/quantifind/KafkaOffsetMonitor/issues/79),因此頗有多是由於你使用了新版本的consumer纔沒法看到的。關於新舊版本,這裏統一說明一下:kafka0.9之前的consumer是使用Scala編寫的,包名結構是kafka.consumer.*,分爲high-level consumer和low-level consumer兩種。咱們熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是這個版本提供的;自0.9版本開始,Kafka提供了java版本的consumer,包名結構是o.a.k.clients.consumer.*,熟知的類包括KafkaConsumer和ConsumerRecord等。新版本的consumer能夠單獨部署,再也不須要依賴server端的代碼。正則表達式

 

2、消費者組 (Consumer Group)apache

1 什麼是消費者組服務器

其實對於這些基本概念的普及,網上資料實在太多了。我本不該該再多此一舉了,但爲了本文的完整性,我仍是要花一些篇幅來重談consumer group,至少能夠說說個人理解。值得一提的是,因爲咱們今天基本上只探討consumer group,對於單獨的消費者不作過多討論。網絡

什麼是consumer group? 一言以蔽之,consumer group是kafka提供的可擴展且具備容錯性的消費者機制。既然是一個組,那麼組內必然能夠有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的全部消費者協調在一塊兒來消費訂閱主題(subscribed topics)的全部分區(partition)。固然,每一個分區只能由同一個消費組內的一個consumer來消費。(網上文章中說到此處各類炫目多彩的圖就會緊跟着拋出來,我這裏就不畫了,請原諒)。我的認爲,理解consumer group記住下面這三個特性就行了:session

  • consumer group下能夠有一個或多個consumer instance,consumer instance能夠是一個進程,也能夠是一個線程
  • group.id是一個字符串,惟一標識一個consumer group
  • consumer group下訂閱的topic下的每一個分區只能分配給某個group下的一個consumer(固然該分區還能夠被分配給其餘group)

2 消費者位置(consumer position) 數據結構

消費者在消費的過程當中須要記錄本身消費了多少數據,即消費位置信息。在Kafka中這個位置信息有個專門的術語:位移(offset)。不少消息引擎都把這部分信息保存在服務器端(broker端)。這樣作的好處固然是實現簡單,但會有三個主要的問題:1. broker今後變成有狀態的,會影響伸縮性;2. 須要引入應答機制(acknowledgement)來確認消費成功。3. 因爲要保存不少consumer的offset信息,必然引入複雜的數據結構,形成資源浪費。而Kafka選擇了不一樣的方式:每一個consumer group保存本身的位移信息,那麼只須要簡單的一個整數表示位置就夠了;同時能夠引入checkpoint機制按期持久化,簡化了應答機制的實現。

3 位移管理(offset management)

3.1 自動VS手動

Kafka默認是按期幫你自動提交位移的(enable.auto.commit = true),你固然能夠選擇手動提交位移實現本身控制。另外kafka會按期把group消費狀況保存起來,作成一個offset map,以下圖所示:

上圖中代表了test-group這個組當前的消費狀況。

 

3.2 位移提交

老版本的位移是提交到zookeeper中的,圖就不畫了,總之目錄結構是:/consumers/<group.id>/offsets/<topic>/<partitionId>,可是zookeeper其實並不適合進行大批量的讀寫操做,尤爲是寫操做。所以kafka提供了另外一種解決方案:增長__consumeroffsets topic,將offset信息寫入這個topic,擺脫對zookeeper的依賴(指保存offset這件事情)。__consumer_offsets中的消息保存了每一個consumer group某一時刻提交的offset信息。依然以上圖中的consumer group爲例,格式大概以下:

 

__consumers_offsets topic配置了compact策略,使得它老是可以保存最新的位移信息,既控制了該topic整體的日誌容量,也能實現保存最新offset的目的。compact的具體原理請參見:Log Compaction

至於每一個group保存到__consumers_offsets的哪一個分區,如何查看的問題請參見這篇文章:Kafka 如何讀取offset topic內容 (__consumer_offsets)

 

4 Rebalance

4.1 什麼是rebalance?

rebalance本質上是一種協議,規定了一個consumer group下的全部consumer如何達成一致來分配訂閱topic的每一個分區。好比某個group下有20個consumer,它訂閱了一個具備100個分區的topic。正常狀況下,Kafka平均會爲每一個consumer分配5個分區。這個分配的過程就叫rebalance。

4.2 何時rebalance?

這也是常常被說起的一個問題。rebalance的觸發條件有三種:

  • 組成員發生變動(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了——這二者的區別後面會談到)
  • 訂閱主題數發生變動——這固然是可能的,若是你使用了正則表達式的方式進行訂閱,那麼新建匹配正則表達式的topic就會觸發rebalance
  • 訂閱主題的分區數發生變動

4.3 如何進行組內分區分配?

以前提到了group下的全部consumer都會協調在一塊兒共同參與分配,這是如何完成的?Kafka新版本consumer默認提供了兩種分配策略:range和round-robin。固然Kafka採用了可插拔式的分配策略,你能夠建立本身的分配器以實現不一樣的分配策略。實際上,因爲目前range和round-robin兩種分配器都有一些弊端,Kafka社區已經提出第三種分配器來實現更加公平的分配策略,只是目前還在開發中。咱們這裏只須要知道consumer group默認已經幫咱們把訂閱topic的分區分配工做作好了就好了。

簡單舉個例子,假設目前某個consumer group下有兩個consumer: A和B,當第三個成員加入時,kafka會觸發rebalance並根據默認的分配策略從新爲A、B和C分配分區,以下圖所示:

 

4.4 誰來執行rebalance和consumer group管理?

Kafka提供了一個角色:coordinator來執行對於consumer group的管理。坦率說kafka對於coordinator的設計與修改是一個很長的故事。最新版本的coordinator也與最初的設計有了很大的不一樣。這裏我只想說起兩次比較大的改變。

首先是0.8版本的coordinator,那時候的coordinator是依賴zookeeper來實現對於consumer group的管理的。Coordinator監聽zookeeper的/consumers/<group>/ids的子節點變化以及/brokers/topics/<topic>數據變化來判斷是否須要進行rebalance。group下的每一個consumer都本身決定要消費哪些分區,並把本身的決定搶先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下注冊。很明顯,這種方案要依賴於zookeeper的幫助,並且每一個consumer是單獨作決定的,沒有那種「你們屬於一個組,要協商作事情」的精神。

基於這些潛在的弊端,0.9版本的kafka改進了coordinator的設計,提出了group coordinator——每一個consumer group都會被分配一個這樣的coordinator用於組管理和位移管理。這個group coordinator比原來承擔了更多的責任,好比組成員管理、位移提交保護機制等。當新版本consumer group的第一個consumer啓動的時候,它會去和kafka server肯定誰是它們組的coordinator。以後該group內的全部成員都會和該coordinator進行協調通訊。顯而易見,這種coordinator設計再也不須要zookeeper了,性能上能夠獲得很大的提高。後面的全部部分咱們都將討論最新版本的coordinator設計。

4.5 如何肯定coordinator?

上面簡單討論了新版coordinator的設計,那麼consumer group如何肯定本身的coordinator是誰呢? 簡單來講分爲兩步:

  • 肯定consumer group位移信息寫入__consumers_offsets的哪一個分區。具體計算公式:
    •   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。
  • 該分區leader所在的broker就是被選定的coordinator

4.6 Rebalance Generation

JVM GC的分代收集就是這個詞(嚴格來講是generational),我這裏把它翻譯成「屆」好了,它表示了rebalance以後的一屆成員,主要是用於保護consumer group,隔離無效offset提交的。好比上一屆的consumer成員是沒法提交位移到新一屆的consumer group中。咱們有時候能夠看到ILLEGAL_GENERATION的錯誤,就是kafka在抱怨這件事情。每次group進行rebalance以後,generation號都會加1,表示group進入到了一個新的版本,以下圖所示: Generation 1時group有3個成員,隨後成員2退出組,coordinator觸發rebalance,consumer group進入Generation 2,以後成員4加入,再次觸發rebalance,group進入Generation 3.

 

4.7 協議(protocol)

前面說過了, rebalance本質上是一組協議。group與coordinator共同使用它來完成group的rebalance。目前kafka提供了5個協議來處理與consumer group coordination相關的問題:

  • Heartbeat請求:consumer須要按期給coordinator發送心跳來代表本身還活着
  • LeaveGroup請求:主動告訴coordinator我要離開consumer group
  • SyncGroup請求:group leader把分配方案告訴組內全部成員
  • JoinGroup請求:成員請求加入組
  • DescribeGroup請求:顯示組的全部信息,包括成員信息,協議名稱,分配方案,訂閱信息等。一般該請求是給管理員使用

Coordinator在rebalance的時候主要用到了前面4種請求。
4.8 liveness

consumer如何向coordinator證實本身還活着? 經過定時向coordinator發送Heartbeat請求。若是超過了設定的超時時間,那麼coordinator就認爲這個consumer已經掛了。一旦coordinator認爲某個consumer掛了,那麼它就會開啓新一輪rebalance,而且在當前其餘consumer的心跳response中添加「REBALANCE_IN_PROGRESS」,告訴其餘consumer:很差意思各位,大家從新申請加入組吧!

4.9 Rebalance過程

終於說到consumer group執行rebalance的具體流程了。不少用戶估計對consumer內部的工做機制也很感興趣。下面就跟你們一塊兒討論一下。固然我必需要明確表示,rebalance的前提是coordinator已經肯定了。

整體而言,rebalance分爲2步:Join和Sync

1 Join, 顧名思義就是加入組。這一步中,全部成員都向coordinator發送JoinGroup請求,請求入組。一旦全部成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——注意leader和coordinator不是一個概念。leader負責消費分配方案的制定。

2 Sync,這一步leader開始分配消費方案,即哪一個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容爲空。coordinator接收到分配方案以後會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的全部成員就都知道本身應該消費哪些分區了。

仍是拿幾張圖來講明吧,首先是加入組的過程:

值得注意的是, 在coordinator收集到全部成員請求前,它會把已收到請求放入一個叫purgatory(煉獄)的地方。記得國內有篇文章以此來證實kafka開發人員都是頗有文藝範的,寫得也是比較有趣,有興趣能夠去搜搜。
而後是分發分配方案的過程,即SyncGroup請求:

注意!! consumer group的分區分配方案是在客戶端執行的!Kafka將這個權利下放給客戶端主要是由於這樣作能夠有更好的靈活性。好比這種機制下我能夠實現相似於Hadoop那樣的機架感知(rack-aware)分配方案,即爲consumer挑選同一個機架下的分區數據,減小網絡傳輸的開銷。Kafka默認爲你提供了兩種分配策略:range和round-robin。因爲這不是本文的重點,這裏就再也不詳細展開了,你只須要記住你能夠覆蓋consumer的參數:partition.assignment.strategy來實現本身分配策略就行了。

4.10 consumer group狀態機

和不少kafka組件同樣,group也作了個狀態機來代表組狀態的流轉。coordinator根據這個狀態機會對consumer group作不一樣的處理,以下圖所示(徹底是根據代碼註釋手動畫的,多見諒吧)

 

簡單說明下圖中的各個狀態:

  • Dead:組內已經沒有任何成員的最終狀態,組的元數據也已經被coordinator移除了。這種狀態響應各類請求都是一個response: UNKNOWN_MEMBER_ID
  • Empty:組內無成員,可是位移信息尚未過時。這種狀態只能響應JoinGroup請求
  • PreparingRebalance:組準備開啓新的rebalance,等待成員加入
  • AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
  • Stable:rebalance完成!能夠開始消費了~

至於各個狀態之間的流程條件以及action,這裏就不具體展開了。

 

3、rebalance場景剖析

上面詳細闡述了consumer group是如何執行rebalance的,可能依然有些雲裏霧裏。這部分對其中的三個重要的場景作詳盡的時序展開,進一步加深對於consumer group內部原理的理解。因爲圖比較直觀,全部的描述都將以圖的方式給出,不作過多的文字化描述了。

1 新成員加入組(member join) 

 

2 組成員崩潰(member failure)

前面說過了,組成員崩潰和組成員主動離開是兩個不一樣的場景。由於在崩潰時成員並不會主動地告知coordinator此事,coordinator有可能須要一個完整的session.timeout週期才能檢測到這種崩潰,這必然會形成consumer的滯後。能夠說離開組是主動地發起rebalance;而崩潰則是被動地發起rebalance。okay,直接上圖: 

3 組成員主動離組(member leave group)

4 提交位移(member commit offset)

相關文章
相關標籤/搜索