在spark JOB中消費kafka隊列數據時,經過zookeeper記錄了kafka的偏移量,有時數據量較大,JOB處理不過來,這事須要kafka修改偏移量offset,如:spa
開始嘗試調用kafka內置的類kafka.tools.UpdateOffsetsInZK,修改offset,以下:調試
[bsauser@bsa222 kafka]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest config/consumer.properties tam_format_alarm
updating partition 0 with new offset: 6776033
updating partition 1 with new offset: 6782580
updating partition 2 with new offset: 6778624
updating partition 3 with new offset: 6786418
updating partition 4 with new offset: 6780299
updated the offset for 5 partitionsorm
可是重啓spark JOB以後,發現並不成功。忽然想到應該跟新zookeeper中該消費group id的偏移量:blog
操做以前先查看下topic offset的最大值和最小值,進入kafka目錄:隊列
查看最小值:kafka
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -2it
結果:spark
查看最大值:io
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -1form
結果:
根據最大值最小值區間,設置kafka的offset。
先進入zookeeper安裝目錄,進入bin目錄,執行./zkCli.sh命令,進入終端:
經過下面命令設置consumer group:bsatam.enhance_alarm topic:tam_format_alarm partition:1 offset 爲 6776033:
set /consumers/enhance_alarm/offsets/tam_format_alarm/0 6776033
一樣,設置其他的partition,partition 1-4 設置命令同樣,須要修改partiton修改下最後面兩個參數的值:
如partition 4的最大值是6780299,如今須要將offset 調爲最大,即命令爲:
set /consumers/enhance_alarm/offsets/tam_format_alarm/4 6780299
調試完5個partition後,重啓JOB,運行正常: