致使kafka的重複消費問題緣由在於,已經消費了數據,可是offset沒來得及提交(好比Kafka沒有或者不知道該數據已經被消費)。
總結如下場景緻使Kakfa重複消費:java
緣由1:強行kill線程,致使消費後的數據,offset沒有提交(消費系統宕機、重啓等)。
緣由2:設置offset爲自動提交,關閉kafka時,若是在close以前,調用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啓會重複消費。
例如:spring
try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); } catch (Exception e) { }
上面代碼會致使部分offset沒提交,下次啓動時會重複消費。apache
解決方法:設置offset自動提交爲falsebootstrap
整合了Spring配置的修改以下配置
spring配置:session
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest
整合了API方式的修改enable.auto.commit爲false
API配置:併發
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false");
一旦設置了 enable.auto.commit 爲 true,Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的全部消息。從順序上來講,poll 方法的邏輯是先提交上一批消息的位移,再處理下一批消息,所以它能保證不出現消費丟失的狀況。oop
緣由3:(重複消費最多見的緣由):消費後的數據,當offset尚未提交時,partition就斷開鏈接。好比,一般會遇到消費的數據,處理很耗時,致使超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那麼就會re-blance重平衡,此時有必定概率offset沒提交,會致使重平衡後重復消費。 this
緣由4:當消費者從新分配partition的時候,可能出現從頭開始消費的狀況,致使重發問題。 線程
緣由5:當消費者消費的速度很慢的時候,可能在一個session週期內還未完成,致使心跳機制檢測報告出問題。日誌
緣由6:併發很大,可能在規定的時間(session.time.out默認30s)內沒有消費完,就會可能致使reblance重平衡,致使一部分offset自動提交失敗,而後重平衡後重復消費
問題描述:
咱們系統壓測過程當中出現下面問題:異常rebalance,並且平均間隔3到5分鐘就會觸發rebalance,分析日誌發現比較嚴重。錯誤日誌以下:
08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
這個錯誤的意思是,消費者在處理完一批poll的消息後,在同步提交偏移量給broker時報的錯。初步分析日誌是因爲當前消費者線程消費的分區已經被broker給回收了,由於kafka認爲這個消費者死了,那麼爲何呢?
問題分析:
這裏就涉及到問題是消費者在建立時會有一個屬性max.poll.interval.ms(默認間隔時間爲300s),
該屬性意思爲kafka消費者在每一輪poll()調用之間的最大延遲,消費者在獲取更多記錄以前能夠空閒的時間量的上限。若是此超時時間期滿以前poll()沒有被再次調用,則消費者被視爲失敗,而且分組將從新平衡,以便將分區從新分配給別的成員。
由於offset此時已經不許確,生產環境不能直接去修改offset偏移量。
因此從新指定了一個消費組(group.id=order_consumer_group),而後指定auto-offset-reset=latest這樣我就只須要重啓個人服務了,而不須要動kafka和zookeeper了!
#consumer spring.kafka.consumer.group-id=order_consumer_group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest
注:若是你想要消費者從頭開始消費某個topic的全量數據,能夠從新指定一個全新的group.id=new_group,而後指定auto-offset-reset=earliest便可