若是你在使用Kafka來分發消息,在數據處理的過程當中可能會出現處理程序出異常或者是其它的錯誤,會形成數據丟失或不一致。這個時候你也許會想要經過kafka把數據重新處理一遍,咱們知道kafka默認會在磁盤上保存到7天的數據,你只須要把kafka的某個topic的consumer的offset設置爲某個值或者是最小值,就能夠使該consumer從你設置的那個點開始消費。html
用下面命令能夠查詢到topic:DynamicRange broker:SparkMaster:9092的offset的最小值:spa
$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -2
輸出code
DynamicRange:0:1288
查詢offset的最大值:htm
$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -1
輸出blog
DynamicRange:0:7885
從上面的輸出能夠看出topic:DynamicRange只有一個partition:0 offset範圍爲:[1288,7885]get
啓動zookeeper clientkafka
$ /opt/cloudera/parcels/CDH/lib/zookeeper/bin/zkCli.sh
經過下面命令設置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset爲1288:it
set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288
注意若是你的kafka設置了zookeeper root,好比爲/kafka,那麼命令應該改成:io
set /kafka/consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288
重啓相關的應用程序,就能夠從設置的offset開始讀數據了。 ast