上一節介紹了kafka工作的核心組件--控制器。本節將介紹消費者密切相關的組件--協調器。它負責消費者的出入組工作。大家可以回想一下kafka核心概念中關於吃蘋果的場景,如果我邀請了100個人過來吃蘋果,如果沒有人告訴每個吃蘋果的人哪個是他的盤子,那豈不是要亂了套?協調器做的就是這個工作。當然還有更多。
顧名思義,協調器負責協調工作。本節所講的協調器,是用來協調消費者工作分配的。簡單點說,就是消費者啓動後,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉起來,全有賴於協調器。
主要的協調器有如下兩個:
1、消費者協調器(ConsumerCoordinator)
2、組協調器(GroupCoordinator)
此外還有任務管理協調器(WorkCoordinator),用作kafka connect的works管理,本教程不做講解。
kafka引入協調器有其歷史過程,原來consumer信息依賴於zookeeper存儲,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊羣效應和腦裂問題。
爲了解決這些問題,kafka引入了協調器。服務端引入組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。每個broker啓動的時候,都會創建GroupCoordinator實例,管理部分消費組(集羣負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責同一個消費組下各個消費者和服務端組協調器之前的通信。如下圖:
消費者協調器,可以看作是消費者做操作的代理類(其實並不是),消費者很多操作通過消費者協調器進行處理。
消費者協調器主要負責如下工作:
1、更新消費者緩存的MetaData
2、向組協調器申請加入組
3、消費者加入組後的相應處理
4、請求離開消費組
5、向組協調器提交偏移量
6、通過心跳,保持組協調器的連接感知。
7、被組協調器選爲leader的消費者的協調器,負責消費者分區分配。分配結果發送給組協調器。
8、非leader的消費者,通過消費者協調器和組協調器同步分配結果。
消費者協調器主要依賴的組件和說明見下圖:
可以看到這些組件和消費者協調器擔負的工作是可以對照上的。
組協調器負責處理消費者協調器發過來的各種請求。它主要提供如下功能:
組協調器在broker啓動的時候實例化,每個組協調器負責一部分消費組的管理。它主要依賴的組件見下圖:
這些組件也是和組協調器的功能能夠對應上的。具體內容不在詳述。
下圖展示了消費者啓動選取leader、入組的過程。
消費者入組的過程,很好的展示了消費者協調器和組協調器之間是如何配合工作的。leader consumer會承擔分區分配的工作,這樣kafka集羣的壓力會小很多。同組的consumer通過組協調器保持同步。消費者和分區的對應關係持久化在kafka內部主題。
消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪裏開始消費。如果整個環境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區變化後,消費者不再對應原來的分區,而每個消費者的offset也沒有同步到服務器,這樣就無法接着前任的工作繼續進行了。
因此只有把消費偏移量定期發送到服務器,由GroupCoordinator集中式管理,分區重分配後,各個消費者從GroupCoordinator讀取自己對應分區的offset,在新的分區上繼續前任的工作。
下圖展示了不提交offset到服務端的問題:
開始時,consumer 0消費partition 0 和1,後來由於新的consumer 2入組,分區重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由於consumer之間是不能通訊的,所有consumer2並不知道從哪裏開始自己的消費。
因此consumer需要定期提交自己消費的offset到服務端,這樣在重分區操作後,每個consumer都能在服務端查到分配給自己的partition所消費到的offset,繼續消費。
由於kafka有高可用和橫向擴展的特性,當有新的分區出現或者新的消費入組後,需要重新分配消費者對應的分區,所以如果偏移量提交的有問題,會重複消費或者丟消息。偏移量提交的時機和方式要格外注意!!
下面兩種情況分別會造成重複消費和丟消息:
以上兩種情況是如何產生的呢?我們繼續往下看。
1、自動提交偏移量
設置 enable.auto.commit爲true,設定好週期,默認5s。消費者每次調用輪詢消息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。
這樣做很方便,但是會帶來重複消費的問題。假如最近一次偏移量提交3s後,觸發了再均衡,服務器端存儲的還是上次提交的偏移量,那麼再均衡結束後,新的消費者會從最後一次提交的偏移量開始拉取消息,此3s內消費的消息會被重複消費。
2、手動提交偏移量
設置 enable.auto.commit爲false。程序中手動調用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。
我們來看下面兩個提交時機:
比較起來,重複消費要比丟消息好一些,所以我們程序應採用第二種方式,同時消費邏輯中,要能夠檢查重複消費。
commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功後才往下運行。這樣會限制程序的吞吐量。如果降低提交頻次,又很容易發生重複消費。
這裏我們可以使用commitAsync()異步提交偏移量。只管提交,而不會等待broker返回提交結果
commitSync只要沒有發生不可恢復錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因爲重試提交時,可能已經有其它更大偏移量已經提交成功了,如果此時重試提交成功,那麼更小的偏移量會覆蓋大的偏移量。那麼如果此時發生再均衡,新的消費者將會重複消費消息。
commitAsync也支持回調,由於上述原因,回調中最好不要因爲失敗而重試提交。而是應該記錄錯誤,以便後續分析和補償。
關於偏移量的提交方式和時機,上文已經有了大量的講解。但看完後好像還不知道應該怎麼提交偏移量纔是最合適的。是不是覺得無論怎麼提交,都無法避免重複消費?沒錯,事實就是這樣,我們只能採用合理的方式,最大可能的去降低發生此類問題的概率。此外做好補償處理。
一般來說,偶爾的提交失敗,不去重試,是沒有問題的。因爲一般是因爲臨時的問題而失敗,後續的提交總會成功。如果我們在關閉消費者或者再均衡前,確保所有的消費者都能成功提交一次偏移量,也可以保證再均衡後,消費者能接着消費數據。
因此我們採用同步和異步混合的方式提交偏移量。
這樣既保證了吞吐量,也保證了提交偏移量的安全性。另外由於再均衡前提交偏移量,降低了重複消費可能。
kafka還提供了提交特定偏移量的方法。我們可以指定分區和offset進行提交。分區和offset的值可以從消息對象中取得。
另外,如果擔心一次取回數據量太大,可能處理到一半的時候出現再均衡,導致偏移量沒有提交,重複消費。那麼可以每n條提交一次。
而當n=1時,也就是處理一條數據就提交一次,會把重複消費的可能降到最低。同時由於增加了和服務端的通訊,效率大大降低。
其實即使這樣,也是可能重複消費的,試想如下場景:
所以我們應平衡重複消費發生的概率和程序的效率,來設置提交的時機。同時程序邏輯一定做好重複消費的檢查工作!
本節從協調器講起,首先介紹了消費者協調器和組協調器,以及他們是如何配合工作的。從消費偏移量的管理展開,詳細介紹了偏移量的提交,及提交的最佳實踐。本節沒有涉及代碼部分,所有知識點相關的代碼將在最後一章中統一給出。現在的要求只是理解知識點。
--------------------- 本文來自 稀有氣體 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/liyiming2017/article/details/82867765?utm_source=copy