經常使用的幾個命令以下:express
kafka-server-start.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-topics.sh
在這幾個命令中,第一個僅用於啓動Kafka,後兩個console經常使用於測試,用途最多的是最後一個命令,因此下面命令中主要介紹的就是 kafka-topics.sh。bootstrap
kafka-server-start.sh
用法:> bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*app
這個命令後面能夠有多個參數,第一個是可選參數,該參數可讓當前命令之後臺服務方式執行,第二個必須是 Kafka 的配置文件。後面還能夠有多個--override開頭的參數,其中的property能夠是Broker Configs中提供的全部參數。這些額外的參數會覆蓋配置文件中的設置。less
例以下面使用同一個配置文件,經過參數覆蓋啓動多個Broker。socket
> bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1 --override listeners=PLAINTEXT://:9092 --override advertised.listeners=PLAINTEXT://192.168.16.150:9092 > bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=1 --override log.dirs=/tmp/kafka-logs-2 --override listeners=PLAINTEXT://:9093 --override advertised.listeners=PLAINTEXT://192.168.16.150:9093
上面這種用法只是用於演示,真正要啓動多個Broker 應該針對不一樣的 Broker 建立相應的 server.properties 配置。async
kafka-console-consumer.sh
這個命令只是簡單的將消息輸出到標準輸出中,該命令支持的參數以下。tcp
option Description ------ ----------- --blacklist <String: blacklist> Blacklist of topics to exclude from consumption. --bootstrap-server <String: server to REQUIRED (unless old consumer is connect to> used): The server to connect to. --consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer. --consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --csv-reporter-enabled If set, the CSV metrics reporter will be enabled --delete-consumer-offsets If specified, the consumer path in zookeeper is deleted when starting up --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --key-deserializer <String: deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --metrics-dir <String: metrics If csv-reporter-enable is set, and directory> this parameter isset, the csv metrics will be outputed here --new-consumer Use the new consumer implementation. This is the default. --offset <String: consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest) --partition <Integer: partition> The partition to consume from. --property <String: prop> The properties to initialize the message formatter. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval. --topic <String: topic> The topic id to consume on. --value-deserializer <String: deserializer for values> --whitelist <String: whitelist> Whitelist of topics to include for consumption. --zookeeper <String: urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
--bootstrap-server 必須指定,一般--topic也要指定查看的主題。若是想要從頭查看消息,還能夠指定--from-beginning參數。通常使用的命令以下。ide
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning工具
還能夠經過下面的命令指定分區查看:測試
>> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --partition 0
kafka-console-producer.sh
這個命令能夠將文件或標準輸入的內容發送到Kafka集羣。該命令參數以下。
Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. (default: 200) --broker-list <String: broker-list> REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. --compression-codec [String: The compression codec: either 'none', compression-codec] 'gzip', 'snappy', or 'lz4'.If specified without value, then it defaults to 'gzip' --key-serializer <String: The class name of the message encoder encoder_class> implementation to use for serializing keys. (default: kafka. serializer.DefaultEncoder) --line-reader <String: reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka. tools. ConsoleProducer$LineMessageReader) --max-block-ms <Long: max block on The max time that the producer will send> block for during a send request (default: 60000) --max-memory-bytes <Long: total memory The total memory used by the producer in bytes> to buffer records waiting to be sent to the server. (default: 33554432) --max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. (default: 16384) --message-send-max-retries <Integer> Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message. (default: 3) --metadata-expiry-ms <Long: metadata The period of time in milliseconds expiration interval> after which we force a refresh of metadata even if we haven't seen any leadership changes. (default: 300000) --old-producer Use the old producer implementation. --producer-property <String: A mechanism to pass user-defined producer_prop> properties in the form key=value to the producer. --producer.config <String: config file> Producer config properties file. Note that [producer-property] takes precedence over this config. --property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader. --queue-enqueuetimeout-ms <Integer: Timeout for event enqueue (default: queue enqueuetimeout ms> 2147483647) --queue-size <Integer: queue_size> If set and the producer is running in asynchronous mode, this gives the maximum amount of messages will queue awaiting sufficient batch size. (default: 10000) --request-required-acks <String: The required acks of the producer request required acks> requests (default: 1) --request-timeout-ms <Integer: request The ack timeout of the producer timeout ms> requests. Value must be non-negative and non-zero (default: 1500) --retry-backoff-ms <Integer> Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default: 100) --socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 102400) --sync If set message send requests to the brokers are synchronously, one at a time as they arrive. --timeout <Integer: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default: 1000) --topic <String: topic> REQUIRED: The topic id to produce messages to. --value-serializer <String: The class name of the message encoder encoder_class> implementation to use for serializing values. (default: kafka. serializer.DefaultEncoder)
其中 --broker-list 和 --topic 是兩個必須提供的參數。
經常使用命令以下。
使用標準輸入方式。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
從文件讀取:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < file-input.txt
kafka-topics.sh
相比上面幾個偶爾使用的命令來講,kafka-topics.sh 相對就比較重要。該命令包含如下參數。
Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> 正在建立或更改主題的分區數 (警告:若是爲具備密鑰的主題 (分區)增長了分區 消息的邏輯或排序將受到影響 --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: 正在建立的主題中每一個分區的複製因子。 replication factor> --topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
下面是幾種經常使用的 topic 命令。
描述主題的配置
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic
設置保留時間
# Deprecated way
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_topic --config retention.ms=1000
# Modern way
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test_topic --add-config retention.ms=1000
若是您須要刪除主題中的全部消息,則能夠利用保留時間。首先將保留時間設置爲很是低(1000 ms),等待幾秒鐘,而後將保留時間恢復爲上一個值。
注意:默認保留時間爲24小時(86400000毫秒)。
刪除主題
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic
注意:須要在Broker的配置文件server.properties中配置 delete.topic.enable=true 才能刪除主題。
主題信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
添加分區
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test_topic --partitions 3
建立主題
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
列出主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
topic 相關內容來源:http://ronnieroller.com/kafka/cheat-sheet
命令那麼多,怎麼記?
Kafka 的命令行工具提供了很是豐富的提示信息,因此只須要記住上面大概的幾個用法,知道怎麼寫就行。當須要用到某個命令時,經過命令提示進行操做。
好比說,如何使用 kafka-configs.sh 查看主題(Topic)的配置?
首先,在命令行中輸入bin/kafka-configs.sh,而後或輸出下面的命令提示信息。
Add/Remove entity config for a topic, client, user or broker Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity_type 'topics': cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled.replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity_type 'brokers': follower.replication.throttled.rate leader.replication.throttled.rate For entity_type 'users': producer_byte_rate SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate For entity_type 'clients': producer_byte_rate consumer_byte_rate Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --alter Alter the configuration for the entity. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for clients/users (applies to corresponding entity type in command line) --entity-name <String> Name of entity (topic name/client id/user principal name/broker id) --entity-type <String> Type of entity (topics/clients/users/brokers) --force Suppress console prompts --help Print usage information. --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
從第一行能夠看到這個命令能夠修改 topic, client, user 或 broker 的配置。
若是要設置 topic,就須要設置 entity-type 爲topics,輸入以下命令:
> bin/kafka-configs.sh --entity-type topics
Command must include exactly one action: --describe, --alter
命令提示須要指定一個操做(不僅是上面提示的兩個操做),增長--describe試試:
> bin/kafka-configs.sh --entity-type topics --describe
[root@localhost kafka_2.11-0.10.2.1]# bin/kafka-configs.sh --entity-type topics --describe
Missing required argument "[zookeeper]"
繼續增長 --zookeeper:
> bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181
Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
因爲沒有指定主題名,這裏顯示了__consumer_offsets的信息。下面指定一個topic試試。
> bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181 --entity-name test
Configs for topic 'test' are
此時顯示了test主題的信息,這裏是空。
由於Kafka完善的命令提示,能夠很輕鬆的經過提示信息來進行下一步操做,運用熟練後,基本上很快就能實現本身想要的命令。