重置kafka的offset

若是你在使用Kafka來分發消息,在數據處理的過程當中可能會出現處理程序出異常或者是其它的錯誤,會形成數據丟失或不一致。這個時候你也許會想要經過kafka把數據重新處理一遍,咱們知道kafka默認會在磁盤上保存到7天的數據,你只須要把kafka的某個topic的consumer的offset設置爲某個值或者是最小值,就能夠使該consumer從你設置的那個點開始消費。html

 

查詢topic的offset的範圍

用下面命令能夠查詢到topic:DynamicRange broker:SparkMaster:9092的offset的最小值:spa

$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -2

輸出code

DynamicRange:0:1288

查詢offset的最大值:htm

$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -1

輸出blog

DynamicRange:0:7885

從上面的輸出能夠看出topic:DynamicRange只有一個partition:offset範圍爲:[1288,7885]get

 

設置consumer group的offset

啓動zookeeper clientkafka

$ /opt/cloudera/parcels/CDH/lib/zookeeper/bin/zkCli.sh

經過下面命令設置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset爲1288:it

set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288

注意若是你的kafka設置了zookeeper root,好比爲/kafka,那麼命令應該改成:io

set /kafka/consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288

 

生效 

重啓相關的應用程序,就能夠從設置的offset開始讀數據了。 ast

 

參考:https://metabroadcast.com/blog/resetting-kafka-offsets

轉載請註明出處:http://www.cnblogs.com/keitsi/p/5685576.html

相關文章
相關標籤/搜索