修改kafka topic offset 的方法

 

在spark JOB中消費kafka隊列數據時,經過zookeeper記錄了kafka的偏移量,有時數據量較大,JOB處理不過來,這事須要kafka修改偏移量offset,如:spa

 

 

開始嘗試調用kafka內置的類kafka.tools.UpdateOffsetsInZK,修改offset,以下:調試

[bsauser@bsa222 kafka]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest config/consumer.properties tam_format_alarm
updating partition 0 with new offset: 6776033
updating partition 1 with new offset: 6782580
updating partition 2 with new offset: 6778624
updating partition 3 with new offset: 6786418
updating partition 4 with new offset: 6780299
updated the offset for 5 partitionsorm

可是重啓spark JOB以後,發現並不成功。忽然想到應該跟新zookeeper中該消費group id的偏移量:blog

操做以前先查看下topic offset的最大值和最小值,進入kafka目錄:隊列

查看最小值:kafka

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -2it

結果:spark

 

查看最大值:io

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -1form

結果:

 

根據最大值最小值區間,設置kafka的offset。

先進入zookeeper安裝目錄,進入bin目錄,執行./zkCli.sh命令,進入終端:

經過下面命令設置consumer group:bsatam.enhance_alarm topic:tam_format_alarm partition:1  offset 爲 6776033:

set /consumers/enhance_alarm/offsets/tam_format_alarm/0 6776033

 

 

一樣,設置其他的partition,partition 1-4 設置命令同樣,須要修改partiton修改下最後面兩個參數的值:

如partition 4的最大值是6780299,如今須要將offset 調爲最大,即命令爲:

set /consumers/enhance_alarm/offsets/tam_format_alarm/4 6780299

 

 調試完5個partition後,重啓JOB,運行正常:

相關文章
相關標籤/搜索