List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.
Option Description
------ -----------
--all-topics Consider all topics assigned to a
group in the `reset-offsets` process.
--bootstrap-server <String: server to REQUIRED (for consumer groups based on
connect to> the new consumer): The server to
connect to.
--by-duration <String: duration> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
Pass in groups with a single topic to
just delete the given topic's partition offsets and ownership information for the given consumer groups. For instance --group g1 -- group g2 --topic t1 Pass in just a topic to delete the given topic's partition offsets and
ownership information for every
consumer group. For instance --topic
t1
WARNING: Group deletion only works for
old ZK-based consumer groups, and
one has to use it carefully to only
delete groups that are not active.
--describe Describe consumer group and list
offset lag (number of messages not
yet processed) related to given
group.
--execute Execute operation. Supported
operations: reset-offsets.
--export Export operation execution to a CSV
file. Supported operations: reset-
offsets.
--from-file <String: path to CSV file> Reset offsets to values defined in CSV
file.
--group <String: consumer group> The consumer group we wish to act on.
--list List all consumer groups.
--new-consumer Use the new consumer implementation.
This is the default, so this option
is deprecated and will be removed in
a future release.
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 3 execution options: (default) to
plan which offsets to reset, --
execute to execute the reset-offsets
process, and --export to export the
results to a CSV format.
Has the following scenarios to choose:
--to-datetime, --by-period, --to-
earliest, --to-latest, --shift-by, --
from-file, --to-current. One
scenario must be choose
To define the scope use: --all-topics
or --topic. . One scope must be
choose, unless you use '--from-file'
scenario
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset
by 'n', where 'n' can be positive or
negative
--timeout <Long: timeout (ms)> The timeout that can be set for some
use cases. For example, it can be
used when describing the group to
specify the maximum amount of time
in milliseconds to wait before the
group stabilizes (when the group is
just created, or is going through
some changes). (default: 5000)
--to-current Reset offsets to current offset.
--to-datetime <String: datetime> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long: offset> Reset offsets to a specific offset.
--topic <String: topic> The topic whose consumer group
information should be deleted or
topic whose should be included in
the reset offset process. In `reset-
offsets` case, partitions can be
specified using this format: `topic1:
0,1,2`, where 0,1,2 are the
partition to be included in the
process. Reset-offsets also supports
multiple topic inputs.
--zookeeper <String: urls> REQUIRED (for consumer groups based on
the old consumer): The connection
string for the zookeeper connection
in the form host:port. Multiple URLS
can be given to allow fail-over.
複製代碼
一、ios
注意,必定不要忘記加 --execute
選項。 注意,必定不要忘記加 --execute
選項。 注意,必定不要忘記加 --execute
選項。express
二、apache
在修改consumer offset時,須要將該consumer group.id下全部的consumer中止運行才能夠進行重置offset的操做。bootstrap
三、bash
若是不想中止全部的consumer(更多的狀況下是不能),而且還想重置offset,那麼能夠經過KafkaConsumer.seek()來實現offset重置。less
--by-duration : The duration must be expressed in the format of PnDTnHnMnS, where nD, nH, nM, and nS indicate the numbers of days, hours, minutes, and seconds respectively. Each n must be a non-negative integer. For example, valid duration values are P1DT1H1M1S, P1D, PT1H1M, and P1DT1S; invalid duration values are -P1D, P1Y1M1D, or PT1H1M1.1S.
須要將表達式中的n所有替換成數字.
> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets --group consumer_group_reset_test --topic <topicName> --by-duration P1DT1H1M0S
複製代碼
--to-datetime <String: datetime> Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' , 例如 : '2018-01-01T00:00:00.000'
小數點後面的毫秒必須顯示指定.
> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets --to-datetime 2018-11-16T00:00:00.000 --group consumer_group_reset_test --topic <topic>
複製代碼
--shift-by 可正可負,正則向後移動,負數則向前移動.
kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets --group consumer_group_reset_test --topic <topicName> --shift-by -100000 --execute
> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets --group consumer_group_reset_test --topic kafka_test_666:0,1,2 --shift-by -666 --execute
複製代碼
--from-file
> kafka-consumer-groups --bootstrap-server <server:port> --reset-offsets --from-file /tmp/topicAndOffset.csv --group consumer_group_reset_test --execute
文件內容:
格式[topic],[partition],[taeget-offset]
kafka_test_666,0,6666
kafka_test_999,0,9999
kafka_test_666,1,8888
複製代碼
能夠參考 : 官方文檔ide