kafka-python 1.4.6 版本觸發的一個 rebalance 問題

在使用了最新版的 kafka-python 1.4.6 在 broker 對 topic 進行默認配置的狀況下報出相似錯誤css

CommitFailedError
CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

這裏要申明一點,在 1.4.0 以上的 kafka-python 版本使用了獨立的心跳線程去上報心跳。html

這裏報錯大概表達的意思是 沒法在默認 300000ms 中完成處理操做。咱們一般會一次性 poll 拉默認 500 條數據下來。咱們須要在 300s 中完成 500 條數據的處理。若是不能完成的話就可能會觸發這個問題。python

由於這個報錯的提示寫得很是清楚,因此咱們先按這個方向去嘗試處理這個問題。首先調高了咱們的 max_poll_interval_ms 的時間,可是無效。git

而後 records 的條數減小,依然無效,該報錯仍是會報錯。這不由讓我懷疑觸發這個問題的是否並不是這裏報錯建議的那些地方。github

 

因此我把目光放到了 broker 日誌那邊去,想看下究竟是由於什麼緣由致使爆出相似錯誤。bootstrap

在日誌上發現了一些日誌,對應的 consumer 在反覆的 rebalance:vim

[2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-05ed83f1-aa90-4950-b097-4cf467598082 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-f7826720-fef7-4b02-8104-d1f38065c2fe in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Preparing to rebalance group sync_group_20180321 with old generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-ac5f6aff-3600-4e67-a529-31674c72b1e4 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,716] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1091 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,721] INFO [GroupCoordinator 0]: Assignment received from leader for group sync_group_20180321 for generation 1091 (kafka.coordinator.group.GroupCoordinator)

參考 sentry 打出來的錯誤,咱們能夠認爲這和 sentry 爆出來的問題有直接關係。所以咱們要從另一個角度去思考一下爲何個人 max_poll_interval_ms 已經調高而且每次拉取處理條數下降卻依然會報出此問題,而且頻繁觸發 rebalance 。session

kafka-python 在 1.4.0 版本分離了心跳線程和 poll 主線程。個人第一反應就是會不會由於 poll 線程阻塞了心跳線程的切換,或者引發了某種死鎖從而致使心跳線程沒法正確的發起心跳。最後致使 broker 認爲 group 已經死亡而後主動觸發了 rebalance .socket

而後我去 kafka-python 的 gihub 搜索了一下相似問題,立刻就發現了有很多人都有這個問題。ide

https://github.com/dpkp/kafka-python/issues/1418

從中找到一些有趣的信息,好比來自 

I am seeing consumer rebalances even if there is no messages to consume. Start three consumers in a group and send some messages to topic and after that stop the producer. The consumer will start seeing rebalances after 5-6mins.
Sample code here:
https://stackoverflow.com/questions/54908902/kafka-consumer-rebalance-occurs-even-if-there-is-no-message-to-consume

他說即便在沒有消息能夠消費的狀況下,也能夠看到 kafka consumer 在過了 5 - 6 mins 以後開啓了 rebalance 。

這就跟咱們的問題很是類似,咱們並非 process 的過程消耗的時間過長而觸發了 rebalance 而是有多是由於消費得太快,致使有些消費者處於 空 poll 的狀態從而阻塞了心跳線程。客觀來講,我目前還會報出這個問題的 topic 有多達 50 個partitions,我開啓了5個消費者對其進行消費,平均一個消費者須要消費 10 個parititons 。若是有某個 partitions 長期沒有消費過來咱們可能會被阻塞在那裏最終致使 heartbeat 超時。 1.4.6 的客戶端默認 10s 沒心跳就超時,而發送間隔僅爲 3s 。也就是連續三個週期沒有發送就超時了。

下面看到 dpkp 的一個回覆,表達了有可能就是被 poll 主線程阻塞,的問題,而且有 workaround 能夠用來避免這種狀況:

vimal: thanks for posting. I believe you may be hitting lock contention between an idle client.poll -- which can block and hold the client lock for the entire request_timeout_ms -- and the attempt by the heartbeat thread to send a new request. It seems to me that we may need to use KafkaClient.wakeup() to make sure that the polling thread drops the lock if/when we need to send a request from a different thread.

This shouldn't be an issue when messages are flowing through your topics at a steady rate. If this is just a test environment, and you expect your production environment to have more steady live data, then you could just ignore the error in testing. But if you are managing a topic w/ very low traffic -- delays of minutes between consecutive messages, for example -- you might try to reduce the request_timeout_ms to something closer to the heartbeat_interval_ms, which should prevent the read side from blocking for much longer than the heartbeat timeout. But note that other timeouts may also need to change (max_poll_interval_ms and session_timeout_ms perhaps). Another workaround might be to reduce metadata_max_age_ms to something close / equal to your heartbeat_timeout_ms. This will cause more frequent metadata requests, but should unblock the send side when there is no socket data available for reads.

 dpkp 的觀點在於,若是咱們數據發送過來的頻率是穩定的,消費者是正好能夠消費完隊列裏面的信息的狀況的時候,不該該出現這樣的問題。出現這樣的問題與咱們預期和看到報錯的狀況可能偏偏相反,不是咱們消費得太慢,而是咱們消費得太快,而且生產者發送消息的頻率太低致使的。在 poll 不到消息的時候,主線程可能會面臨阻塞,而沒法及時切換到心跳線程進行心跳的發送,最終致使了這個問題。

他給到一個 trick 的方法來解決這個問題,當面臨這種狀況的時候咱們能夠把 metadata_max_age_ms 調整到和心跳發送頻率差很少 close / equal to our heartbeat_timeout_ms.

發送 metadata_request 會解除咱們發送端的阻塞,從而達到抑制死鎖的效果。

self.kafka = kafka.KafkaConsumer(
    auto_offset_reset=auto_offset_reset,
    bootstrap_servers=['10.171.97.1:9092', '10.163.13.219:9092', '10.170.249.122:9092'],
    group_id=group_id,
    metadata_max_age_ms=metadata_max_age_ms
)
self.kafka.subscribe(topics)

嘗試補充了 metadata_max_age_ms 大約 3000 ms ,這個問題獲得了很大程度的解決和緩解。

既然肯定了多是由於消費太快,而後生產慢致使的主線程鎖住的問題,剩下能夠驗證一下是否真的是這樣。嘗試打日誌看一下切換線程發送心跳的狀況能夠來確認該問題是否如此。

另外看代碼發現 poll 主線程在 poll 以前會默認會進行 wakeup() 可是在 1.4.6裏面也由於以前的 某個 bug 而默認關閉了,不知道是否有影響,等後續測試以後補上。

 

 

Reference:

https://github.com/dpkp/kafka-python/issues/1418  Heartbeat failed for group xxxWorker because it is rebalancing

https://github.com/dpkp/kafka-python/issues/1760  [1.4.5] KafkaProducer raises KafkaTimeoutError when attempting wakeup()

https://www.cnblogs.com/huxi2b/p/6815797.html  Kafka 0.11版本新功能介紹 —— 空消費組延時rebalance

相關文章
相關標籤/搜索