在上一篇 Kafka使用Java實現數據的生產和消費demo 中介紹如何簡單的使用kafka進行數據傳輸。本篇則重點介紹kafka中的 consumer 消費者的講解。html
在上一篇kafka的consumer消費者,咱們使用的是自動提交offset下標。
可是offset下標自動提交其實在不少場景都不適用,由於自動提交是在kafka拉取到數據以後就直接提交,這樣很容易丟失數據,尤爲是在須要事物控制的時候。
不少狀況下咱們須要從kafka成功拉取數據以後,對數據進行相應的處理以後再進行提交。如拉取數據以後進行寫入mysql這種 , 因此這時咱們就須要進行手動提交kafka的offset下標。mysql
這裏順便說下offset具體是什麼。
offset:指的是kafka的topic中的每一個消費組消費的下標。
簡單的來講就是一條消息對應一個offset下標,每次消費數據的時候若是提交offset,那麼下次消費就會從提交的offset加一那裏開始消費。
好比一個topic中有100條數據,我消費了50條而且提交了,那麼此時的kafka服務端記錄提交的offset就是49(offset從0開始),那麼下次消費的時候offset就從50開始消費。linux
說了這麼,那麼咱們開始進行手動提交測試。
首先,使用kafka 的producer 程序往kafka集羣發送了100條測試數據。git
程序打印中已經成功發送了,這裏咱們在kafka服務器使用命令中來查看是否成功發送.
命令以下:github
kafka-console-consumer.sh --zookeeper master:2181 --topic KAFKA_TEST2 --from-beginning
注:
1.master 是我在linux中作了IP映射的關係,實際能夠換成IP。
2.由於kafka是集羣,因此也能夠在集羣的其餘機器進行消費。sql
能夠看到已經成功發送了100條。數據庫
成功發送消息以後,咱們再使用kafka的consumer 進行數據消費。服務器
由於是用來測試手動提交
因此 將 enable.auto.commit 改爲 false 進行手動提交
而且設置每次拉取最大10條測試
props.put("enable.auto.commit", "false"); props.put("max.poll.records", 10);
將提交方式改爲false以後
須要手動提交只需加上這段代碼spa
consumer.commitSync();
那麼首先嚐試消費不提交,測試能不能重複消費。
右鍵運行main方法進行消費,不提交offset下標。
成功消費以後,結束程序,再次運行main方法進行消費,也不提交offset下標。
並未手動進行提交,並且並未更改消費組名,可是能夠看到已經重複消費了!
接下來,開始測試手動提交。
爲了達到上述目的,咱們測試只需添加以下代碼便可:
if(list.size()==50){ consumer.commitSync(); }
更改代碼以後,開始運行程序
測試示例圖以下:
簡單的一看,和以前未提交的同樣,貌似沒有什麼問題。
可是正常來講,未提交的下標不該該重複進行消費,直到它提交爲止嗎?
由於要進行重複消費,可是messageNo 會一直累加,只會手動的提交前50條offset,
後面的50條offset會一直沒法消費,因此打印的條數不該該是100,而是應該一直打印。
那麼測試的結果和預想的爲何不一致呢?
以前不是已經測試過能夠重複消費未提交的offset嗎?
其實這點能夠根據兩次啓動方式的不一樣而得出結論。
開始測試未提交重複消費的時候,實際我是啓動-暫停-啓動,那麼本地的consumer實際是被初始化過兩次。
而剛剛測試的實際consumer只有初始化一次。
至於爲何初始化一次就不行呢?
由於kafka的offset下標的記錄實際會有兩份,服務端會本身記錄一份,本地的消費者客戶端也會記錄一份,提交的offset會告訴服務端已經消費到這了,可是本地的並不會所以而改變offset進行再次消費。
簡單的來講假若有10條數據,在第5條的時候進行提交了offset下標,那麼服務端就知道該組消費的下標到第5條了,若是同組其餘的consumer進行消費的時候就會從第6條開始進行消費。可是本地的消費者客戶端並不會所以而改變,它仍是會繼續消費下去,並不會再次從第6條開始消費,因此會出現上圖狀況。
可是項目中運行以後,是不會所以而重啓的,因此這時咱們能夠換一種思路。
就是若是觸發某個條件,因此致使offset未提交,咱們就能夠關閉以前的consumer,而後新new一個consumer,這樣就能夠再次進行消費了! 固然配置要和以前的同樣。
那麼將以前的提交代碼更改以下:
if(list.size()==50){ consumer.commitSync(); }else if(list.size()>50){ consumer.close(); init(); list.clear(); list2.clear(); }
注:這裏由於是測試,爲了簡單明瞭,因此條件我寫的很簡單。實際狀況請根據我的的爲準。
示例圖以下:
說明:
1.由於每次是拉取10條,因此在60條的時候kafka的配置初始化了,而後又重新拉取了50-60條的數據,可是沒有提交,因此並不會影響實際結果。
2.這裏爲了方便截圖展現,因此打印條件改了,可是不影響程序!
從測試結果中,咱們達到了以前想要測試的目的,未提交的offset能夠重複進行消費。
這種作法通常也能夠知足大部分需求。
例如從kafka獲取數據入庫,若是一批數據入庫成功,就提交offset,不然不提交,而後再次拉取。
可是這種作法並不能最大的保證數據的完整性。好比在運行的時候,程序掛了之類的。
因此還有一種方法是手動的指定offset下標進行獲取數據,直到kafka的數據處理成功以後,將offset記錄下來,好比寫在數據庫中。那麼這種作法,等到下一篇再進行嘗試吧!
該項目我放在github上了,有興趣的能夠看看!
地址:https://github.com/xuwujing/kafka
到此,本文結束,謝謝閱讀!