消息重複和丟失是kafka中很常見的問題,主要發生在如下三個階段:數據庫
生產發送的消息沒有收到正確的broke響應,致使producer重試。緩存
producer發出一條消息,broke落盤之後由於網絡等種種緣由發送端獲得一個發送失敗的響應或者網絡中斷,而後producer收到一個可恢復的Exception重試消息致使消息重複。 網絡
說明:
1. new KafkaProducer()後建立一個後臺線程KafkaThread掃描RecordAccumulator中是否有消息;
2. 調用KafkaProducer.send()發送消息,實際上只是把消息保存到RecordAccumulator中;
3. 後臺線程KafkaThread掃描到RecordAccumulator中有消息後,將消息發送到kafka集羣;
4. 若是發送成功,那麼返回成功;
5. 若是發送失敗,那麼判斷是否容許重試。若是不容許重試,那麼返回失敗的結果;若是容許重試,把消息再保存到RecordAccumulator中,等待後臺線程KafkaThread掃描再次發送;異步
異常是RetriableException類型或者TransactionManager容許重試;RetriableException類繼承關係以下:ide
若是設置max.in.flight.requests.per.connection大於1(默認5,單個鏈接上發送的未確認請求的最大數量,表示上一個發出的請求沒有確認下一個請求又發出了)。大於1可能會改變記錄的順序,由於若是將兩個batch發送到單個分區,第一個batch處理失敗並重試,可是第二個batch處理成功,那麼第二個batch處理中的記錄可能先出現被消費。spa
設置max.in.flight.requests.per.connection爲1,可能會影響吞吐量,能夠解決單臺producer發送順序問題。若是多個producer,producer1先發送一個請求,producer2後發送請求,這是producer1返回可恢復異常,重試必定次數成功了。雖然時producer1先發送消息,可是producer2發送的消息會被先消費。線程
要啓動kafka的冪等性,無需修改代碼,默認爲關閉,須要修改配置文件:enable.idempotence=true 同時要求 ack=all 且 retries>1。3d
冪等原理:日誌
每一個producer有一個producer id,服務端會經過這個id關聯記錄每一個producer的狀態,每一個producer的每條消息會帶上一個遞增的sequence,服務端會記錄每一個producer對應的當前最大sequence,producerId + sequence ,若是新的消息帶上的sequence不大於當前的最大sequence就拒絕這條消息,若是消息落盤會同時更新最大sequence,這個時候重發的消息會被服務端拒掉從而避免消息重複。該配置一樣應用於kafka事務中。blog
可能會丟消息,適用於吞吐量指標重要性高於數據丟失,例如:日誌收集。
producer發送消息完,無論結果了,若是發送失敗也就丟失了。
producer發送消息完,只等待lead寫入成功就返回了,leader crash了,這時follower沒來及同步,消息丟失。
容許選舉ISR之外的副本做爲leader,會致使數據丟失,默認爲false。producer發送異步消息完,只等待lead寫入成功就返回了,leader crash了,這時ISR中沒有follower,leader從OSR中選舉,由於OSR中原本落後於Leader形成消息丟失。
producer發送消息完,等待ollower同步完再返回,若是異常則重試。這是副本的數量可能影響吞吐量,最大不超過5個,通常三個足夠了。
不容許選舉ISR之外的副本做爲leader。
當producer將acks設置爲「all」(或「-1」)時,min.insync。副本指定必須確認寫操做成功的最小副本數量。若是不能知足這個最小值,則生產者將引起一個異常(要麼是NotEnoughReplicas,要麼是NotEnoughReplicasAfterAppend)。
當一塊兒使用時,min.insync.replicas和ack容許執行更大的持久性保證。一個典型的場景是建立一個複製因子爲3的主題,設置min.insync複製到2個,用「all」配置發送。將確保若是大多數副本沒有收到寫操做,則生產者將引起異常。
producer發送消息,會自動重試,遇到不可恢復異常會拋出,這時能夠捕獲異常記錄到數據庫或緩存,進行單獨處理。
數據消費完沒有及時提交offset到broke。
消息消費端在消費過程當中掛掉沒有及時提交offset到broke,另外一個消費端啓動拿以前記錄的offset開始消費,因爲offset的滯後性可能會致使新啓動的客戶端有少許重複消費。
每次消費完或者程序退出時手動提交。這可能也無法保證一條重複。
通常的解決方案是讓下游作冪等或者儘可能每消費一條消息都記錄offset,對於少數嚴格的場景可能須要把offset或惟一ID,例如訂單ID和下游狀態更新放在同一個數據庫裏面作事務來保證精確的一次更新或者在下游數據表裏面同時記錄消費offset,而後更新下游數據的時候用消費位點作樂觀鎖拒絕掉舊位點的數據更新。