kafka最好用的腳本二:kafka-consumer-groups

kafka-consumer-groups 使用手冊

官方說明:

List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.
Option                                  Description
------                                  -----------
--all-topics                            Consider all topics assigned to a
                                          group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED (for consumer groups based on
  connect to>                             the new consumer): The server to
                                          connect to.
--by-duration <String: duration>        Reset offsets to offset by duration
                                          from current timestamp. Format:
                                          'PnDTnHnMnS'
--command-config <String: command       Property file containing configs to be
  config property file>                   passed to Admin Client and Consumer.
--delete                                Pass in groups to delete topic
                                          partition offsets and ownership
                                          information over the entire consumer
                                          group. For instance --group g1 --
                                          group g2
                                        Pass in groups with a single topic to
                                          just delete the given topic's partition offsets and ownership information for the given consumer groups. For instance --group g1 -- group g2 --topic t1 Pass in just a topic to delete the given topic's partition offsets and
                                          ownership information for every
                                          consumer group. For instance --topic
                                          t1
                                        WARNING: Group deletion only works for
                                          old ZK-based consumer groups, and
                                          one has to use it carefully to only
                                          delete groups that are not active.
--describe                              Describe consumer group and list
                                          offset lag (number of messages not
                                          yet processed) related to given
                                          group.
--execute                               Execute operation. Supported
                                          operations: reset-offsets.
--export                                Export operation execution to a CSV
                                          file. Supported operations: reset-
                                          offsets.
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV
                                          file.
--group <String: consumer group>        The consumer group we wish to act on.
--list                                  List all consumer groups.
--new-consumer                          Use the new consumer implementation.
                                          This is the default, so this option
                                          is deprecated and will be removed in
                                          a future release.
--reset-offsets                         Reset offsets of consumer group.
                                          Supports one consumer group at the
                                          time, and instances should be
                                          inactive
                                        Has 3 execution options: (default) to
                                          plan which offsets to reset, --
                                          execute to execute the reset-offsets
                                          process, and --export to export the
                                          results to a CSV format.
                                        Has the following scenarios to choose:
                                          --to-datetime, --by-period, --to-
                                          earliest, --to-latest, --shift-by, --
                                          from-file, --to-current. One
                                          scenario must be choose
                                        To define the scope use: --all-topics
                                          or --topic. . One scope must be
                                          choose, unless you use '--from-file'
                                          scenario
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset
                                          by 'n', where 'n' can be positive or
                                          negative
--timeout <Long: timeout (ms)>          The timeout that can be set for some
                                          use cases. For example, it can be
                                          used when describing the group to
                                          specify the maximum amount of time
                                          in milliseconds to wait before the
                                          group stabilizes (when the group is
                                          just created, or is going through
                                          some changes). (default: 5000)
--to-current                            Reset offsets to current offset.
--to-datetime <String: datetime>        Reset offsets to offset from datetime.
                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest                           Reset offsets to earliest offset.
--to-latest                             Reset offsets to latest offset.
--to-offset <Long: offset>              Reset offsets to a specific offset.
--topic <String: topic>                 The topic whose consumer group
                                          information should be deleted or
                                          topic whose should be included in
                                          the reset offset process. In `reset-
                                          offsets` case, partitions can be
                                          specified using this format: `topic1:
                                          0,1,2`, where 0,1,2 are the
                                          partition to be included in the
                                          process. Reset-offsets also supports
                                          multiple topic inputs.
--zookeeper <String: urls>              REQUIRED (for consumer groups based on
                                          the old consumer): The connection
                                          string for the zookeeper connection
                                          in the form host:port. Multiple URLS
                                          can be given to allow fail-over.
複製代碼

使用方法

注意點

一、ios

注意,必定不要忘記加 --execute 選項。 注意,必定不要忘記加 --execute 選項。 注意,必定不要忘記加 --execute 選項。express

二、apache

在修改consumer offset時,須要將該consumer group.id下全部的consumer中止運行才能夠進行重置offset的操做。bootstrap

三、bash

若是不想中止全部的consumer(更多的狀況下是不能),而且還想重置offset,那麼能夠經過KafkaConsumer.seek()來實現offset重置。less

offset 相對偏移(跨時間)

--by-duration : The duration must be expressed in the format of PnDTnHnMnS, where nD, nH, nM, and nS indicate the numbers of days, hours, minutes, and seconds respectively. Each n must be a non-negative integer. For example, valid duration values are P1DT1H1M1S, P1D, PT1H1M, and P1DT1S; invalid duration values are -P1D, P1Y1M1D, or PT1H1M1.1S.
須要將表達式中的n所有替換成數字.

> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets  --group consumer_group_reset_test --topic <topicName> --by-duration P1DT1H1M0S
複製代碼

offset 指向某時間節點

--to-datetime <String: datetime>        Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' , 例如 : '2018-01-01T00:00:00.000'
小數點後面的毫秒必須顯示指定.

> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets --to-datetime  2018-11-16T00:00:00.000 --group consumer_group_reset_test --topic <topic>
複製代碼

offset 相對偏移

--shift-by  可正可負,正則向後移動,負數則向前移動.

kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets  --group consumer_group_reset_test --topic <topicName> --shift-by -100000 --execute

> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets  --group consumer_group_reset_test --topic kafka_test_666:0,1,2 --shift-by -666 --execute
複製代碼

批量修改 offset

--from-file 

> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets --from-file /tmp/topicAndOffset.csv --group consumer_group_reset_test --execute

文件內容:
格式[topic],[partition],[taeget-offset]
kafka_test_666,0,6666
kafka_test_999,0,9999
kafka_test_666,1,8888
複製代碼

能夠參考 : 官方文檔ide

相關文章
相關標籤/搜索