估計運維年前沒有祭拜服務器,Nginx的問題修復了,Kafka又不行了。今天,原本想再睡會,結果,電話又響了。仍是運營,「喂,冰河,到公司了嗎?趕忙看看服務器吧,又出問題了「。「在路上了,運維那哥們兒還沒上班嗎」? 「還在休假。。。」, 我:「。。。」。哎,這哥們兒是跑路了嗎?先無論他,問題仍是要解決。java
到公司後,放下我專用的雙肩包,拿出個人利器——筆記本電腦,打開後迅速登陸監控系統,發現主要業務系統沒啥問題。一個非核心服務發出了告警,而且監控系統中顯示這個服務頻繁的拋出以下異常。spring
2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
從上面輸出的異常信息,大概能夠判斷出系統出現的問題:Kafka消費者在處理完一批poll消息後,在同步提交偏移量給broker時報錯了。大概就是由於當前消費者線程的分區被broker給回收了,由於Kafka認爲這個消費者掛掉了,咱們能夠從下面的輸出信息中能夠看出這一點。apache
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Kafka內部觸發了Rebalance機制,明確了問題,接下來,咱們就開始分析問題了。編程
既然Kafka觸發了Rebalance機制,那我就來講說Kafka觸發Rebalance的時機。bash
舉個具體點的例子,好比某個分組下有10個Consumer實例,這個分組訂閱了一個50個分區的主題。正常狀況下,Kafka會爲每一個消費者分配5個分區。這個分配的過程就是Rebalance。服務器
當Kafka中知足以下條件時,會觸發Rebalance:微信
後面兩種狀況咱們能夠人爲的避免,在實際工做過程當中,對於Kafka發生Rebalance最多見的緣由是消費組成員的變化。網絡
消費者成員正常的添加和停掉致使Rebalance,這種狀況沒法避免,可是時在某些狀況下,Consumer 實例會被 Coordinator 錯誤地認爲 「已中止」 從而被「踢出」Group,致使Rebalance。session
當 Consumer Group 完成 Rebalance 以後,每一個 Consumer 實例都會按期地向 Coordinator 發送心跳請求,代表它還存活着。若是某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認爲該 Consumer 已經 「死」 了,從而將其從 Group 中移除,而後開啓新一輪 Rebalance。這個時間能夠經過Consumer 端的參數 session.timeout.ms
進行配置。默認值是 10 秒。架構
除了這個參數,Consumer 還提供了一個控制發送心跳請求頻率的參數,就是 heartbeat.interval.ms
。這個值設置得越小,Consumer 實例發送心跳請求的頻率就越高。頻繁地發送心跳請求會額外消耗帶寬資源,但好處是可以更加快速地知曉當前是否開啓 Rebalance,由於,目前 Coordinator 通知各個 Consumer 實例開啓 Rebalance 的方法,就是將 REBALANCE_NEEDED
標誌封裝進心跳請求的響應體中。
除了以上兩個參數,Consumer 端還有一個參數,用於控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms
參數。它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示 Consumer 程序若是在 5 分鐘以內沒法消費完 poll 方法返回的消息,那麼 Consumer 會主動發起 「離開組」 的請求,Coordinator 也會開啓新一輪 Rebalance。
經過上面的分析,咱們能夠看一下那些rebalance是能夠避免的:
第一類非必要 Rebalance 是由於未能及時發送心跳,致使 Consumer 被 「踢出」Group 而引起的。這種狀況下咱們能夠設置 session.timeout.ms 和 heartbeat.interval.ms 的值,來儘可能避免rebalance的出現。(如下的配置是在網上找到的最佳實踐,暫時還沒測試過)
session.timeout.ms >= 3 * heartbeat.interval.ms
。將 session.timeout.ms 設置成 6s 主要是爲了讓 Coordinator 可以更快地定位已經掛掉的 Consumer,早日把它們踢出 Group。
第二類非必要 Rebalance 是 Consumer 消費時間過長致使的。此時,max.poll.interval.ms
參數值的設置顯得尤其關鍵。若是要避免非預期的 Rebalance,最好將該參數值設置得大一點,比下游最大處理時間稍長一點。
總之,要爲業務處理邏輯留下充足的時間。這樣,Consumer 就不會由於處理這些消息的時間太長而引起 Rebalance 。
kafka
的偏移量(offset
)是由消費者進行管理的,偏移量有兩種,拉取偏移量
(position)與提交偏移量
(committed)。拉取偏移量表明當前消費者分區消費進度。每次消息消費後,須要提交偏移量。在提交偏移量時,kafka
會使用拉取偏移量
的值做爲分區的提交偏移量
發送給協調者。
若是沒有提交偏移量,下一次消費者從新與broker鏈接後,會從當前消費者group已提交到broker的偏移量處開始消費。
因此,問題就在這裏,當咱們處理消息時間太長時,已經被broker剔除,提交偏移量又會報錯。因此拉取偏移量沒有提交到broker,分區又rebalance。下一次從新分配分區時,消費者會從最新的已提交偏移量處開始消費。這裏就出現了重複消費的問題。
其實,說了這麼多,Kafka消費者輸出的異常日誌中也給出了相應的解決方案。
接下來,咱們說說Kafka中的拉取偏移量和提交偏移量。
其實,從輸出的日誌信息中,也大概給出瞭解決問題的方式,簡單點來講,就是能夠經過增長 max.poll.interval.ms
時長和 session.timeout.ms
時長,減小 max.poll.records
的配置值,而且消費端在處理完消息時要及時提交偏移量。
經過以前的分析,咱們應該知道如何解決這個問題了。這裏須要說一下的是,我在集成Kafka的時候,使用的是SpringBoot和Kafka消費監聽器,消費端的主要代碼結構以下所示。
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value()); try { Object value = record.value(); logger.info(value.toString()); ack.acknowledge(); } catch (Exception e) { logger.error("日誌消費端異常: {}", e); } }
上述代碼邏輯比較簡單,就是獲取到Kafka中的消息後直接打印輸出到日誌文件中。
這裏,我先根據異常日誌的提示信息進行配置,因此,我在SpringBoot的application.yml文件中新增了以下配置信息。
spring: kafka: consumer: properties: max.poll.interval.ms: 3600000 max.poll.records: 50 session.timeout.ms: 60000 heartbeat.interval.ms: 3000
配置完成後,再次測試消費者邏輯,發現仍是拋出Rebalance異常。
咱們從另外一個角度來看下Kafka消費者所產生的問題:一個Consumer在生產消息,另外一個Consumer在消費它的消息,它們不能在同一個groupId 下面,更改其中一個的groupId 便可。
這裏,咱們的業務項目是分模塊和子系統進行開發的,例如模塊A在生產消息,模塊B消費模塊A生產的消息。此時,修改配置參數,例如 session.timeout.ms: 60000
,根本不起做用,仍是拋出Rebalance
異常。
此時,我嘗試修改下消費者分組的groupId,將下面的代碼
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
修改成以下所示的代碼。
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
再次測試,問題解決~~
此次解決的問題真是個奇葩啊!!接下來寫個【Kafka系列】專題,詳細介紹Kafka的原理、源碼解析和實戰等內容,小夥伴們大家以爲呢?歡迎文末留言討論~~
好了,今天就到這兒吧,我是冰河,你們有啥問題能夠在下方留言,也能夠加我微信:sun_shine_lyz,我拉你進羣,一塊兒交流技術,一塊兒進階,一塊兒牛逼~~