Apache Kafka入門教程輕鬆學-第四章 Kafka核心組件和流程-設計-原理(二)協調器(消費者和組協調器)

上一節介紹了kafka工作的核心組件--控制器。本節將介紹消費者密切相關的組件--協調器。它負責消費者的出入組工作。大家可以回想一下kafka核心概念中關於吃蘋果的場景,如果我邀請了100個人過來吃蘋果,如果沒有人告訴每個吃蘋果的人哪個是他的盤子,那豈不是要亂了套?協調器做的就是這個工作。當然還有更多。

2 協調器

顧名思義,協調器負責協調工作。本節所講的協調器,是用來協調消費者工作分配的。簡單點說,就是消費者啓動後,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉起來,全有賴於協調器。

主要的協調器有如下兩個:

1、消費者協調器(ConsumerCoordinator)

2、組協調器(GroupCoordinator)

此外還有任務管理協調器(WorkCoordinator),用作kafka connect的works管理,本教程不做講解。

kafka引入協調器有其歷史過程,原來consumer信息依賴於zookeeper存儲,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊羣效應和腦裂問題。

爲了解決這些問題,kafka引入了協調器。服務端引入組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。每個broker啓動的時候,都會創建GroupCoordinator實例,管理部分消費組(集羣負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責同一個消費組下各個消費者和服務端組協調器之前的通信。如下圖:

2.1 消費者協調器

消費者協調器,可以看作是消費者做操作的代理類(其實並不是),消費者很多操作通過消費者協調器進行處理。

消費者協調器主要負責如下工作:

1、更新消費者緩存的MetaData

2、向組協調器申請加入組

3、消費者加入組後的相應處理

4、請求離開消費組

5、向組協調器提交偏移量

6、通過心跳,保持組協調器的連接感知。

7、被組協調器選爲leader的消費者的協調器,負責消費者分區分配。分配結果發送給組協調器。

8、非leader的消費者,通過消費者協調器和組協調器同步分配結果。

消費者協調器主要依賴的組件和說明見下圖:

可以看到這些組件和消費者協調器擔負的工作是可以對照上的。

 

2.2 組協調器

組協調器負責處理消費者協調器發過來的各種請求。它主要提供如下功能:

  1. 在與之連接的消費者中選舉出消費者leader
  2. 下發leader消費者返回的消費者分區分配結果給所有的消費者
  3. 管理消費者的消費偏移量提交,保存在kafka的內部主題中
  4. 和消費者心跳保持,知道哪些消費者已經死掉,組中存活的消費者是哪些。

組協調器在broker啓動的時候實例化,每個組協調器負責一部分消費組的管理。它主要依賴的組件見下圖:

這些組件也是和組協調器的功能能夠對應上的。具體內容不在詳述。

 

2.3 消費者入組過程

下圖展示了消費者啓動選取leader、入組的過程。

消費者入組的過程,很好的展示了消費者協調器和組協調器之間是如何配合工作的。leader consumer會承擔分區分配的工作,這樣kafka集羣的壓力會小很多。同組的consumer通過組協調器保持同步。消費者和分區的對應關係持久化在kafka內部主題。

 

2.4 消費偏移量管理

消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪裏開始消費。如果整個環境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區變化後,消費者不再對應原來的分區,而每個消費者的offset也沒有同步到服務器,這樣就無法接着前任的工作繼續進行了。

因此只有把消費偏移量定期發送到服務器,由GroupCoordinator集中式管理,分區重分配後,各個消費者從GroupCoordinator讀取自己對應分區的offset,在新的分區上繼續前任的工作。

下圖展示了不提交offset到服務端的問題:

開始時,consumer 0消費partition 0 和1,後來由於新的consumer 2入組,分區重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由於consumer之間是不能通訊的,所有consumer2並不知道從哪裏開始自己的消費。

因此consumer需要定期提交自己消費的offset到服務端,這樣在重分區操作後,每個consumer都能在服務端查到分配給自己的partition所消費到的offset,繼續消費。

由於kafka有高可用和橫向擴展的特性,當有新的分區出現或者新的消費入組後,需要重新分配消費者對應的分區,所以如果偏移量提交的有問題,會重複消費或者丟消息。偏移量提交的時機和方式要格外注意!!

下面兩種情況分別會造成重複消費和丟消息:

  1. 如果提交的偏移量小於消費者最後一次消費的偏移量,那麼再均衡後,兩個offset之間的消息就會被重複消費
  2. 如果提交的偏移量大於消費者最後一次消費的偏移量,那麼再均衡後,兩個offset之間的消息就會丟失

以上兩種情況是如何產生的呢?我們繼續往下看。

2.4.1 偏移量有兩種提交方式

1、自動提交偏移量

設置 enable.auto.commit爲true,設定好週期,默認5s。消費者每次調用輪詢消息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。

這樣做很方便,但是會帶來重複消費的問題。假如最近一次偏移量提交3s後,觸發了再均衡,服務器端存儲的還是上次提交的偏移量,那麼再均衡結束後,新的消費者會從最後一次提交的偏移量開始拉取消息,此3s內消費的消息會被重複消費。

2、手動提交偏移量

設置 enable.auto.commit爲false。程序中手動調用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。

我們來看下面兩個提交時機:

  • 如果poll完馬上調用commitSync(),那麼一旦處理到中間某條消息的時候異常,由於偏移量已經提交,那麼出問題的消息位置到提交偏移量之間的消息就會丟失。

  • 如果處理完所有消息後才調用commitSync()。有可能在處理到一半的時候發生再均衡,此時偏移量還未提交,那麼再均衡後,會從上次提交的位置開始消費,造成重複消費。

 

比較起來,重複消費要比丟消息好一些,所以我們程序應採用第二種方式,同時消費邏輯中,要能夠檢查重複消費。

commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功後才往下運行。這樣會限制程序的吞吐量。如果降低提交頻次,又很容易發生重複消費。

這裏我們可以使用commitAsync()異步提交偏移量。只管提交,而不會等待broker返回提交結果

commitSync只要沒有發生不可恢復錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因爲重試提交時,可能已經有其它更大偏移量已經提交成功了,如果此時重試提交成功,那麼更小的偏移量會覆蓋大的偏移量。那麼如果此時發生再均衡,新的消費者將會重複消費消息。

commitAsync也支持回調,由於上述原因,回調中最好不要因爲失敗而重試提交。而是應該記錄錯誤,以便後續分析和補償。

 

2.4.2  偏移量提交的最佳實踐

關於偏移量的提交方式和時機,上文已經有了大量的講解。但看完後好像還不知道應該怎麼提交偏移量纔是最合適的。是不是覺得無論怎麼提交,都無法避免重複消費?沒錯,事實就是這樣,我們只能採用合理的方式,最大可能的去降低發生此類問題的概率。此外做好補償處理。

一般來說,偶爾的提交失敗,不去重試,是沒有問題的。因爲一般是因爲臨時的問題而失敗,後續的提交總會成功。如果我們在關閉消費者或者再均衡前,確保所有的消費者都能成功提交一次偏移量,也可以保證再均衡後,消費者能接着消費數據。

因此我們採用同步和異步混合的方式提交偏移量。

  1. 正常消費消息時,消費結束提交偏移量,採用異步方式
  2. 如果程序報錯,finally中,提交偏移量,採用同步方式,確保提交成功
  3. 再均衡前的回調方法中,提交偏移量,採用同步方式,確保提交成功

這樣既保證了吞吐量,也保證了提交偏移量的安全性。另外由於再均衡前提交偏移量,降低了重複消費可能。

kafka還提供了提交特定偏移量的方法。我們可以指定分區和offset進行提交。分區和offset的值可以從消息對象中取得。

另外,如果擔心一次取回數據量太大,可能處理到一半的時候出現再均衡,導致偏移量沒有提交,重複消費。那麼可以每n條提交一次。

而當n=1時,也就是處理一條數據就提交一次,會把重複消費的可能降到最低。同時由於增加了和服務端的通訊,效率大大降低。

其實即使這樣,也是可能重複消費的,試想如下場景:

  1. 消費者拉取到數據後,開始邏輯處理
  2. 處理第一條offset=2,成功了,提交offset=3
  3. 開始處理offset=3的消息,處理完成後,但提交offset=4前,此消費者突然意外掛掉了,所以也沒能進入異常處理。偏移量沒能成功提交。
  4. 消費者進行了再均衡,新的消費者接手此分區進行消費,取到的offset還是上一次提交的3,那麼將會重複消費offset=3的消息。

所以我們應平衡重複消費發生的概率和程序的效率,來設置提交的時機。同時程序邏輯一定做好重複消費的檢查工作!

 

2.5 回顧

本節從協調器講起,首先介紹了消費者協調器和組協調器,以及他們是如何配合工作的。從消費偏移量的管理展開,詳細介紹了偏移量的提交,及提交的最佳實踐。本節沒有涉及代碼部分,所有知識點相關的代碼將在最後一章中統一給出。現在的要求只是理解知識點。

--------------------- 本文來自 稀有氣體 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/liyiming2017/article/details/82867765?utm_source=copy