Kafka重複消費緣由 redis
底層根本緣由:已經消費了數據,可是offset沒提交。 服務器
緣由1:強行kill線程,致使消費後的數據,offset沒有提交。 分佈式
緣由2:設置offset爲自動提交,關閉kafka時,若是在close以前,調用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啓會重複消費。例如: spa
try { 線程
consumer.unsubscribe(); 內存
} catch (Exception e) { kafka
} it
try { io
consumer.close(); 集羣
} catch (Exception e) {
}
上面代碼會致使部分offset沒提交,下次啓動時會重複消費。
Kafka Consumer丟失數據緣由
猜想:設置offset爲自動定時提交,當offset被自動定時提交時,數據還在內存中未處理,此時恰好把線程kill掉,那麼offset已經提交,可是數據未處理,致使這部份內存中的數據丟失。
記錄offset和恢復offset的方案
理論上記錄offset,下一個group consumer能夠接着記錄的offset位置繼續消費。
offset記錄方案:
每次消費時更新每一個topic+partition位置的offset在內存中,
Map<key, value>,key=topic+'-'+partition,value=offset
當調用關閉consumer線程時,把上面Map的offset數據記錄到 文件中*(分佈式集羣可能要記錄到redis中)。
下一次啓動consumer,須要讀取上一次的offset信息,方法是 以當前的topic+partition爲key,從上次的Map中去尋找offset。
而後使用consumer.seek()方法指定到上次的offset位置。
說明:
一、該方案針對單臺服務器比較簡單,直接把offset記錄到本地文件中便可,可是對於多臺服務器集羣,offset也要記錄到同一個地方,而且須要作去重處理。
若是線上程序是由多臺服務器組成的集羣,是否能夠用一臺服務器來支撐?應該能夠,只是消費慢一點,沒多大影響。
二、如何保證接着offset消費的數據正確性
爲了確保consumer消費的數據必定是接着上一次consumer消費的數據,
consumer消費時,記錄第一次取出的數據,將其offset和上次consumer最後消費的offset進行對比,若是相同則繼續消費。若是不一樣,則中止消費,檢查緣由。