Apache Kafka運維經常使用命令html
做者:尹正傑java
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。node
一.管理Kafka服務的命令正則表達式
1>.開啓kafka服務shell
[root@node106.yinzhengjie.org.cn ~]# kafka-server-start.sh /home/softwares/kafka_2.11-0.10.2.1/config/server.properties >> /dev/null &
2>.中止kafka服務(刷新日誌到本地且遷移當前broker的全部leader的parition)bootstrap
[root@node106.yinzhengjie.org.cn ~]# kafka-server-stop.sh
舒適提示:
須要注意的是,咱們在中止kafka集羣時,若是時partition的副本數爲2的話,不推薦你們同時停掉2臺broker,並且在生產環境中執行該腳本時可能須要等一段時間broker進程纔會完全中止,此期間最好不要使用kill命令去強行殺死進程。
優雅關閉:
Kafka broker可能由於故障停機,但不少時候須要進行計劃內的運維操做。這時候可以使用更優雅的方式來關閉broker進程,而不是簡單地kill掉。優雅關閉有兩點好處:
1>.會將全部logs刷寫到磁盤,以便在重啓時避免log恢復步驟,log恢復是比較耗時地,所以這回讓計劃內的重啓加快。
2>.會在關閉前,讓該Broker是leader的全部partition把leader遷移到其餘broker副本的服務器上去。這會使leader的轉移加快,並使每一個分區不可用的時間縮小到幾毫秒。
優雅關閉的腳本存放在kafka的安裝目錄下的bin目錄下,腳本名稱爲"kafka-server-stop.sh",能夠看到還是使用kill來進行的。須要注意,上述好處第一條不須要額外配置,第二條須要設置controlled.shutdown.enable=true,而官方默認的配置就是true,所以咱們不須要特地去修改該屬性。
3>.查看kafka進程ID安全
[root@node106.yinzhengjie.org.cn ~]# jps | grep Kafka 16434 Kafka [root@node106.yinzhengjie.org.cn ~]#
二.kafka-topics.sh命令使用案例服務器
1>.建立主題(kafka-topics.sh)app
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --create --partitions 10 --replication-factor 2 --topic yinzhengjie-kafka Created topic "yinzhengjie-kafka". [root@node106.yinzhengjie.org.cn ~]#
2>.查看主題列表less
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --list yinzhengjie-kafka [root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs' node107.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 4 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 0 Jul 11 15:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 0 Jul 11 15:45 replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 12 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 70 Jul 11 17:43 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 70 Jul 11 17:44 replication-offset-checkpoint drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-0 drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-1 drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-3 node108.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 12 -rw-r--r-- 1 root root 0 Jul 11 15:18 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:43 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:44 replication-offset-checkpoint drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-0 drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-8 [root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2' node107.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 4 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 0 Jul 11 15:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 0 Jul 11 15:45 replication-offset-checkpoint node108.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 12 -rw-r--r-- 1 root root 0 Jul 11 15:18 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:44 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:44 replication-offset-checkpoint drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-5 drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-6 node106.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 12 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:44 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:44 replication-offset-checkpoint drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-5 drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-9 [root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3' node108.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 12 -rw-r--r-- 1 root root 0 Jul 11 15:18 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:44 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:45 replication-offset-checkpoint drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-2 drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-4 node107.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 4 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 0 Jul 11 15:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 0 Jul 11 15:45 replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 12 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:45 replication-offset-checkpoint drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-6 drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-7 [root@node106.yinzhengjie.org.cn ~]#
3>.刪除topic
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --delete --topic yinzhengjie-kafka Topic yinzhengjie-kafka is marked for deletion. #這裏告訴咱們說"yinzhengjie-kafka"這個topic已經標記爲刪除啦! Note: This will have no impact if delete.topic.enable is not set to true. #告訴咱們若是想要真正刪除的話,這裏提示咱們必須標記爲True,可是真正的刪除時間咱們還得結合"log.retention.check.interval.ms"設置的時間,該參數用來檢測觸發刪除策略的週期。爲了實驗方便,我推薦你們能夠把這個值改成10000,即10秒,固然得重啓集羣方能生效。 [root@node106.yinzhengjie.org.cn ~]# [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --list yinzhengjie-kafka - marked for deletion [root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs' node107.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 4 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 0 Jul 11 15:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 0 Jul 11 15:45 replication-offset-checkpoint node108.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 16 -rw-r--r-- 1 root root 4 Jul 11 17:55 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 16 -rw-r--r-- 1 root root 4 Jul 11 17:55 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 70 Jul 11 17:55 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 70 Jul 11 17:55 replication-offset-checkpoint [root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2' node108.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 16 -rw-r--r-- 1 root root 4 Jul 11 17:55 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint node107.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 4 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 0 Jul 11 15:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 0 Jul 11 15:45 replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 16 -rw-r--r-- 1 root root 4 Jul 11 17:55 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint [root@node106.yinzhengjie.org.cn ~]#
[root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3' node107.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 4 -rw-r--r-- 1 root root 0 Jul 11 15:45 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 0 Jul 11 15:45 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 0 Jul 11 15:45 replication-offset-checkpoint node108.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 16 -rw-r--r-- 1 root root 4 Jul 11 17:55 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint node106.yinzhengjie.org.cn | SUCCESS | rc=0 >> total 16 -rw-r--r-- 1 root root 4 Jul 11 17:55 cleaner-offset-checkpoint -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint [root@node106.yinzhengjie.org.cn ~]#
4>.查看已經建立的topic詳細信息
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --create --zookeeper node106.yinzhengjie.org.cn:2181/kafka01 --partitions 5 --replication-factor 2 --topic yinzhengjie-kafka001 Created topic "yinzhengjie-kafka001". [root@node106.yinzhengjie.org.cn ~]# [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001 Topic:yinzhengjie-kafka001 PartitionCount:5 ReplicationFactor:2 Configs: Topic: yinzhengjie-kafka001 Partition: 0 Leader: 108 Replicas: 108,106 Isr: 108,106 Topic: yinzhengjie-kafka001 Partition: 1 Leader: 106 Replicas: 106,107 Isr: 106,107 Topic: yinzhengjie-kafka001 Partition: 2 Leader: 107 Replicas: 107,108 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 3 Leader: 108 Replicas: 108,107 Isr: 108,107 Topic: yinzhengjie-kafka001 Partition: 4 Leader: 106 Replicas: 106,108 Isr: 106,108 [root@node106.yinzhengjie.org.cn ~]#
5>.修改kafka的分區數並添加對應的配置信息
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --alter --topic yinzhengjie-kafka001 --partitions 10 --config max.message.bytes=10000000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic "yinzhengjie-kafka001". WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [root@node106.yinzhengjie.org.cn ~]# [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001 Topic:yinzhengjie-kafka001 PartitionCount:10 ReplicationFactor:2 Configs:max.message.bytes=10000000 #注意,這裏的配置就是我們剛剛使用--config參數自定義的 Topic: yinzhengjie-kafka001 Partition: 0 Leader: 108 Replicas: 108,106 Isr: 108,106 Topic: yinzhengjie-kafka001 Partition: 1 Leader: 106 Replicas: 106,107 Isr: 106,107 Topic: yinzhengjie-kafka001 Partition: 2 Leader: 107 Replicas: 107,108 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 3 Leader: 108 Replicas: 108,107 Isr: 108,107 Topic: yinzhengjie-kafka001 Partition: 4 Leader: 106 Replicas: 106,108 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 5 Leader: 107 Replicas: 107,108 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 6 Leader: 108 Replicas: 108,107 Isr: 108,107 Topic: yinzhengjie-kafka001 Partition: 7 Leader: 106 Replicas: 106,108 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 8 Leader: 107 Replicas: 107,106 Isr: 107,106 Topic: yinzhengjie-kafka001 Partition: 9 Leader: 108 Replicas: 108,106 Isr: 108,106 #很顯然,分區數由以前的5個變爲10個啦! [root@node106.yinzhengjie.org.cn ~]# [root@node106.yinzhengjie.org.cn ~]#
6>.爲topic刪除配置信息
[root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001 Topic:yinzhengjie-kafka001 PartitionCount:10 ReplicationFactor:2 Configs:max.message.bytes=10000000 Topic: yinzhengjie-kafka001 Partition: 0 Leader: 108 Replicas: 108,106 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 1 Leader: 106 Replicas: 106,107 Isr: 106,107 Topic: yinzhengjie-kafka001 Partition: 2 Leader: 107 Replicas: 107,108 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 3 Leader: 108 Replicas: 108,107 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 4 Leader: 106 Replicas: 106,108 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 5 Leader: 107 Replicas: 107,108 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 6 Leader: 108 Replicas: 108,107 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 7 Leader: 106 Replicas: 106,108 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 8 Leader: 107 Replicas: 107,106 Isr: 106,107 Topic: yinzhengjie-kafka001 Partition: 9 Leader: 108 Replicas: 108,106 Isr: 106,108 [root@node106.yinzhengjie.org.cn ~]# [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --alter --topic yinzhengjie-kafka001 --delete-config max.message.bytes #刪除對應的配置信息 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic "yinzhengjie-kafka001". [root@node106.yinzhengjie.org.cn ~]# [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001 Topic:yinzhengjie-kafka001 PartitionCount:10 ReplicationFactor:2 Configs: #咱們發現這裏的配置信息被刪除啦! Topic: yinzhengjie-kafka001 Partition: 0 Leader: 108 Replicas: 108,106 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 1 Leader: 106 Replicas: 106,107 Isr: 106,107 Topic: yinzhengjie-kafka001 Partition: 2 Leader: 107 Replicas: 107,108 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 3 Leader: 108 Replicas: 108,107 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 4 Leader: 106 Replicas: 106,108 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 5 Leader: 107 Replicas: 107,108 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 6 Leader: 108 Replicas: 108,107 Isr: 107,108 Topic: yinzhengjie-kafka001 Partition: 7 Leader: 106 Replicas: 106,108 Isr: 106,108 Topic: yinzhengjie-kafka001 Partition: 8 Leader: 107 Replicas: 107,106 Isr: 106,107 Topic: yinzhengjie-kafka001 Partition: 9 Leader: 108 Replicas: 108,106 Isr: 106,108 [root@node106.yinzhengjie.org.cn ~]#
三.kafka功能測試
1>.啓動生產者
[root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh --broker-list node108.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka2019 尹正傑到此一遊! https://www.cnblogs.com/yinzhengjie/
[root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh Read data from standard input and publish it to Kafka. Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. (default: 200) --broker-list <String: broker-list> REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. --compression-codec [String: The compression codec: either 'none', compression-codec] 'gzip', 'snappy', or 'lz4'.If specified without value, then it defaults to 'gzip' --key-serializer <String: The class name of the message encoder encoder_class> implementation to use for serializing keys. (default: kafka. serializer.DefaultEncoder) --line-reader <String: reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka. tools. ConsoleProducer$LineMessageReader) --max-block-ms <Long: max block on The max time that the producer will send> block for during a send request (default: 60000) --max-memory-bytes <Long: total memory The total memory used by the producer in bytes> to buffer records waiting to be sent to the server. (default: 33554432) --max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. (default: 16384) --message-send-max-retries <Integer> Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message. (default: 3) --metadata-expiry-ms <Long: metadata The period of time in milliseconds expiration interval> after which we force a refresh of metadata even if we haven't seen any leadership changes. (default: 300000) --old-producer Use the old producer implementation. --producer-property <String: A mechanism to pass user-defined producer_prop> properties in the form key=value to the producer. --producer.config <String: config file> Producer config properties file. Note that [producer-property] takes precedence over this config. --property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader. --queue-enqueuetimeout-ms <Integer: Timeout for event enqueue (default: queue enqueuetimeout ms> 2147483647) --queue-size <Integer: queue_size> If set and the producer is running in asynchronous mode, this gives the maximum amount of messages will queue awaiting sufficient batch size. (default: 10000) --request-required-acks <String: The required acks of the producer request required acks> requests (default: 1) --request-timeout-ms <Integer: request The ack timeout of the producer timeout ms> requests. Value must be non-negative and non-zero (default: 1500) --retry-backoff-ms <Integer> Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default: 100) --socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 102400) --sync If set message send requests to the brokers are synchronously, one at a time as they arrive. --timeout <Integer: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default: 1000) --topic <String: topic> REQUIRED: The topic id to produce messages to. --value-serializer <String: The class name of the message encoder encoder_class> implementation to use for serializing values. (default: kafka. serializer.DefaultEncoder) [root@node106.yinzhengjie.org.cn ~]#
2>.啓動消費者
[root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh --bootstrap-server node107.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka2019 尹正傑到此一遊! https://www.cnblogs.com/yinzhengjie/
[root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh The console consumer is a tool that reads data from Kafka and outputs it to standard output. Option Description ------ ----------- --blacklist <String: blacklist> Blacklist of topics to exclude from consumption. --bootstrap-server <String: server to REQUIRED (unless old consumer is connect to> used): The server to connect to. --consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer. --consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --csv-reporter-enabled If set, the CSV metrics reporter will be enabled --delete-consumer-offsets If specified, the consumer path in zookeeper is deleted when starting up --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --key-deserializer <String: deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --metrics-dir <String: metrics If csv-reporter-enable is set, and directory> this parameter isset, the csv metrics will be outputed here --new-consumer Use the new consumer implementation. This is the default. --offset <String: consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest) --partition <Integer: partition> The partition to consume from. --property <String: prop> The properties to initialize the message formatter. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval. --topic <String: topic> The topic id to consume on. --value-deserializer <String: deserializer for values> --whitelist <String: whitelist> Whitelist of topics to include for consumption. --zookeeper <String: urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. [root@node106.yinzhengjie.org.cn ~]# [root@node106.yinzhengjie.org.cn ~]#
四.其餘操做
1>.Leadership均衡
當broker關閉或故障時,該節點是leader的全部parition把leader遷移到其餘副本的服務器上去。這意味着當broker重啓後,其全部partition將只是follower,即不會被任何客戶端用來讀寫。 爲避免這種不均衡,kafka有一個傾向副本(prefered replica)的概念。若是某個分區的列表是:108,106,107。則108號節點比106和107更傾向於leader,由於它排在最前面。 能夠經過「kafka-preferred-replica-election.sh」讓kafka集羣嘗試讓傾向副本從新成爲leader。該命令的具體用法下面我給處理具體的執行案例。 頻繁容許該命令很無聊,你能夠配置一下選項讓kafka自動執行"auto.leader.rebalance.enable=true",很顯然咱們是不須要配置該值的,由於它默認就是true喲~
[root@node106.yinzhengjie.org.cn ~]# kafka-preferred-replica-election.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 Created preferred replica election path with {"version":1,"partitions":[{"topic":"yinzhengjie-kafka2019","partition":5},{"topic":"__consumer_offsets","partition":34},{"topic":"__consumer_offsets","partition":36},{"topic":"__c onsumer_offsets","partition":27},{"topic":"yinzhengjie-kafka2019","partition":4},{"topic":"yinzhengjie-kafka","partition":15},{"topic":"__consumer_offsets","partition":1},{"topic":"__consumer_offsets","partition":20},{"topic":"__consumer_offsets","partition":7},{"topic":"__consumer_offsets","partition":42},{"topic":"yinzhengjie-kafka","partition":6},{"topic":"__consumer_offsets","partition":49},{"topic":"yinzhengjie-kafka2019","partition":11},{"topic":"test","partition":2},{"topic":"__consumer_offsets","partition":4},{"topic":"__consumer_offsets","partition":33},{"topic":"yinzhengjie-kafka001","partition":4},{"topic":"yinzhengjie-kafka2019","partition":13},{"topic":"__consumer_offsets","partition":14},{"topic":"__consumer_offsets","partition":46},{"topic":"__consumer_offsets","partition":24},{"topic":"__consumer_offsets","partition":28},{"topic":"yinzhengjie-kafka","partition":1},{"topic":"__consumer_offsets","partition":6},{"topic":"yinzhengjie-kafka2019","partition":7},{"topic":"yinzhengjie-kafka","partition":12},{"topic":"yinzhengjie-kafka","partition":16},{"topic":"__consumer_offsets","partition":37},{"topic":"yinzhengjie-kafka2019","partition":3},{"topic":"yinzhengjie-kafka001","partition":3},{"topic":"__consumer_offsets","partition":43},{"topic":"yinzhengjie-kafka001","partition":0},{"topic":"yinzhengjie-kafka","partition":11},{"topic":"test","partition":1},{"topic":"__consumer_offsets","partition":21},{"topic":"yinzhengjie-kafka2019","partition":0},{"topic":"yinzhengjie-kafka","partition":2},{"topic":"__consumer_offsets","partition":15},{"topic":"__consumer_offsets","partition":11},{"topic":"yinzhengjie-kafka","partition":0},{"topic":"yinzhengjie-kafka","partition":5},{"topic":"__consumer_offsets","partition":30},{"topic":"yinzhengjie-kafka001","partition":7},{"topic":"__consumer_offsets","partition":2},{"topic":"__consumer_offsets","partition":47},{"topic":"yinzhengjie-kafka2019","partition":17},{"topic":"__consumer_offsets","partition":25},{"topic":"__consumer_offsets","partition":29},{"topic":"kafka001","partition":2},{"topic":"yinzhengjie-kafka","partition":4},{"topic":"yinzhengjie-kafka2019","partition":16},{"topic":"yinzhengjie-kafka2019","partition":2},{"topic":"__consumer_offsets","partition":8},{"topic":"__consumer_offsets","partition":23},{"topic":"yinzhengjie-kafka2019","partition":12},{"topic":"yinzhengjie-kafka001","partition":8},{"topic":"test","partition":0},{"topic":"__consumer_offsets","partition":40},{"topic":"__consumer_offsets","partition":31},{"topic":"yinzhengjie-kafka001","partition":5},{"topic":"yinzhengjie-kafka001","partition":2},{"topic":"yinzhengjie-kafka","partition":8},{"topic":"__consumer_offsets","partition":19},{"topic":"__consumer_offsets","partition":16},{"topic":"kafka001","partition":1},{"topic":"__consumer_offsets","partition":38},{"topic":"yinzhengjie-kafka","partition":9},{"topic":"yinzhengjie-kafka","partition":13},{"topic":"__consumer_offsets","partition":44},{"topic":"__consumer_offsets","partition":10},{"topic":"yinzhengjie-kafka001","partition":1},{"topic":"__consumer_offsets","partition":3},{"topic":"yinzhengjie-kafka","partition":17},{"topic":"__consumer_offsets","partition":35},{"topic":"kafka001","partition":4},{"topic":"yinzhengjie-kafka2019","partition":1},{"topic":"yinzhengjie-kafka2019","partition":10},{"topic":"kafka001","partition":3},{"topic":"yinzhengjie-kafka2019","partition":6},{"topic":"yinzhengjie-kafka","partition":18},{"topic":"__consumer_offsets","partition":26},{"topic":"__consumer_offsets","partition":39},{"topic":"__consumer_offsets","partition":13},{"topic":"yinzhengjie-kafka","partition":10},{"topic":"__consumer_offsets","partition":17},{"topic":"yinzhengjie-kafka","partition":14},{"topic":"__consumer_offsets","partition":22},{"topic":"yinzhengjie-kafka2019","partition":8},{"topic":"yinzhengjie-kafka","partition":3},{"topic":"__consumer_offsets","partition":9},{"topic":"__consumer_offsets","partition":0},{"topic":"__consumer_offsets","partition":41},{"topic":"yinzhengjie-kafka2019","partition":18},{"topic":"yinzhengjie-kafka001","partition":6},{"topic":"yinzhengjie-kafka2019","partition":9},{"topic":"__consumer_offsets","partition":48},{"topic":"yinzhengjie-kafka2019","partition":14},{"topic":"yinzhengjie-kafka","partition":7},{"topic":"__consumer_offsets","partition":18},{"topic":"kafka001","partition":0},{"topic":"__consumer_offsets","partition":32},{"topic":"yinzhengjie-kafka2019","partition":15},{"topic":"yinzhengjie-kafka","partition":19},{"topic":"yinzhengjie-kafka2019","partition":19},{"topic":"__consumer_offsets","partition":12},{"topic":"yinzhengjie-kafka001","partition":9},{"topic":"__consumer_offsets","partition":45},{"topic":"__consumer_offsets","partition":5}]}Successfully started preferred replica election for partitions Set([yinzhengjie-kafka,6], [__consumer_offsets,32], [__consumer_offsets,16], [__consumer_offsets,49], [__consumer_offsets,44], [yinzhengjie-kafka001,1], [yinzheng jie-kafka2019,15], [test,0], [__consumer_offsets,28], [yinzhengjie-kafka,8], [yinzhengjie-kafka,3], [kafka001,1], [yinzhengjie-kafka2019,12], [__consumer_offsets,17], [yinzhengjie-kafka,5], [yinzhengjie-kafka2019,6], [__consumer_offsets,23], [yinzhengjie-kafka,19], [__consumer_offsets,7], [yinzhengjie-kafka2019,10], [__consumer_offsets,4], [yinzhengjie-kafka,12], [__consumer_offsets,29], [yinzhengjie-kafka2019,19], [__consumer_offsets,35], [yinzhengjie-kafka,17], [__consumer_offsets,3], [__consumer_offsets,24], [__consumer_offsets,41], [kafka001,2], [__consumer_offsets,0], [kafka001,4], [__consumer_offsets,38], [yinzhengjie-kafka2019,11], [yinzhengjie-kafka001,7], [__consumer_offsets,13], [yinzhengjie-kafka2019,3], [__consumer_offsets,8], [yinzhengjie-kafka2019,17], [yinzhengjie-kafka2019,16], [__consumer_offsets,5], [yinzhengjie-kafka2019,13], [test,2], [__consumer_offsets,39], [__consumer_offsets,36], [yinzhengjie-kafka,11], [yinzhengjie-kafka,13], [__consumer_offsets,40], [yinzhengjie-kafka2019,4], [yinzhengjie-kafka001,3], [__consumer_offsets,45], [__consumer_offsets,15], [__consumer_offsets,33], [yinzhengjie-kafka,0], [__consumer_offsets,37], [yinzhengjie-kafka2019,18], [yinzhengjie-kafka001,8], [__consumer_offsets,21], [yinzhengjie-kafka001,9], [yinzhengjie-kafka,15], [yinzhengjie-kafka,18], [yinzhengjie-kafka,9], [yinzhengjie-kafka2019,2], [test,1], [__consumer_offsets,6], [yinzhengjie-kafka2019,5], [yinzhengjie-kafka,2], [yinzhengjie-kafka,7], [__consumer_offsets,11], [__consumer_offsets,20], [yinzhengjie-kafka001,0], [__consumer_offsets,47], [kafka001,3], [__consumer_offsets,2], [__consumer_offsets,27], [yinzhengjie-kafka001,5], [yinzhengjie-kafka2019,8], [__consumer_offsets,34], [__consumer_offsets,9], [__consumer_offsets,22], [__consumer_offsets,42], [__consumer_offsets,14], [yinzhengjie-kafka001,2], [__consumer_offsets,25], [yinzhengjie-kafka,10], [yinzhengjie-kafka001,6], [__consumer_offsets,10], [yinzhengjie-kafka001,4], [__consumer_offsets,48], [__consumer_offsets,31], [__consumer_offsets,18], [__consumer_offsets,19], [yinzhengjie-kafka2019,1], [__consumer_offsets,12], [yinzhengjie-kafka,4], [yinzhengjie-kafka2019,7], [yinzhengjie-kafka,1], [yinzhengjie-kafka2019,14], [kafka001,0], [yinzhengjie-kafka,16], [__consumer_offsets,46], [__consumer_offsets,43], [__consumer_offsets,1], [yinzhengjie-kafka2019,0], [yinzhengjie-kafka,14], [__consumer_offsets,26], [yinzhengjie-kafka2019,9], [__consumer_offsets,30])[root@node106.yinzhengjie.org.cn ~]#
2>.集羣間數據鏡像
不一樣Kafka集羣間數據的複製稱爲「鏡像」。Kafka內置的跨集羣複製工具叫做MirrorMaker。該工具從源集羣讀取數據並寫入到目標集羣。典型的用例有舊數據遷移,跨數據中心備份。
能夠運行多個鏡像進程,以增長吞吐量和容錯。
數據會從源集羣的topic中讀取,以一樣的topic寫入目標集羣。事實上MirrorMaker至關於一個Kafka consumer和producer串接。
源和目標集羣是徹底獨立的:它們能夠有不一樣的分區數和offset。所以鏡像集羣不是源集羣中的一種容錯機制,由於consumer和offset是不同的。
集羣的容錯建議使用集羣內的副本機制。但MirrorMaker會保持並使用分區鍵,所以在鍵層面是保序的。
MirrorMaker是高度配置的。首先,它使用了一個生產者和多個消費者,因此產生和消費者的相關配置參數均可以用於配置MirrorMaker。另外,MirrorMaker自己也有一些配置參數,這些配置參數之間有時候會比較複雜的依賴關係。
先來看一個MirrorMaker的例子,咱們着重說明一些重要的配置參數:
[root@node106.yinzhengjie.org.cn ~]# kafka-mirror-maker.sh --consumer.config /home/softwares/kafka_2.11-0.10.2.1/config/consumer.properties --producer.config /home/softwares/kafka_2.11-0.10.2.1/config/producer.properties --new.consumer -num.streams=2 --whitelist ".*"
consumer.config:
改參數用於指定消費者的配置文件。全部的消費者都將共用這個配置,也就是說,只能配置一個源集羣和一個group.id。全部的消費者屬於同一個消費者羣組,這正好與咱們的要求不謀而合。配置文件裏面有兩個必選的參數:bootstrap.servers(源集羣的服務器地址)和group.id。除了這兩個參數外,還能夠爲消費者指定其餘任意的配置參數。auto.commit.enable參數通常不須要修改,用默認值false就行。MirrorMaker會在消息安全到達目標集羣以後提交偏移量,因此不能使用自動提交。若是修改了這個參數,也就是說,MirrorMaker只會對那些在MirrorMaker啓動以後到達源集羣的數據進行鏡像。若是想要以前的數據,須要把該參數設置爲earliest。
producer.config:
改參數用於指定生產者的配置文件。配置文件裏惟一必須的參數是bootstrap.servers(目標集羣的服務器地址)。
new.consumer:
MirrorMaker只能使用0.8版本或者0.9版本的消費者,建議使用0.9版本的消費者。由於它更加穩定。
num.strams:
以前已經解釋過,一個流就是一個消費者。全部的消費者共用一個生產者,MirrorMaker將會使用這些流來填充同一個生產者。若是須要額外的吞吐量,就須要建立另外一個MirrorMaker進程。
witelist:
這是一個正則表達式,表明了須要進行鏡像的主題名字。全部表達式匹配的主題都將被鏡像。在這個例子裏,咱們但願鏡像全部的topic,不過在實際當中最好使用相似「ad.*」這樣的表達式,避免鏡像測試用的主題。 固然咱們也能夠","或者"|"來指定多個topic名稱,好比:"data.ad|adta.op|data.er",若是隻有部分topic不要採集,可使用--blacklist參數。
3>.集羣退役和服役(建議先安裝kafka-manager或者Kafka Eagle先關監控工具)
將新節點加入Kafka集羣很容易,只須要指定一個惟一的broker id並啓動Kafka進程便可。但新服務器不會自動獲取任何分區,除非將現有分區移動到新服務器上。不然直到建立新的topic,該服務器都不會起做用。
分區重分佈工具不能爲退役節點自動生成重分佈計劃,但能夠將退役節點容納的paritition分配到其餘節點便可,這能夠經過自動生成計劃實現,也能夠手動編寫計劃,只不過略有一點麻煩。
kafka提供了一個分區重分佈工具kafka-reassign-partitions.sh能夠完成上述工做。該工具能夠自動生成重分佈計劃,也能夠指手工指定,但該工具不能自動檢測到分區的分佈的不均衡,所以仍然須要管理員觸發該工具。
該工具能夠運行於三種互斥的模式下:
(1)generate :
該模式下,指定一些topic和一些broker,工具能夠自動生成重分佈計劃以將這些topic分佈到這些broker上去。這種模式提供了一種建議的方法來生成重分佈計劃(但須要謹慎使用,可能會形成很大的數據移動)。
(2)execcute:
該模式下,工具使用用戶提供的從新分佈計劃來執行充分佈。該重分佈幾乎能夠是手工編輯的,也能夠是"--generate"模式生成的。
(3)verify:
該模式下,工具檢驗上一次"--execute"執行的重分佈幾乎的狀態。狀態多是successfully complated,failed或in progress。
詳情請參考:https://www.cnblogs.com/yinzhengjie/p/9808125.html
4>.Kafka Offset相關命令總結
詳情請參考:https://www.cnblogs.com/yinzhengjie/p/10514206.html
5>.設置配額(所謂配額就是客戶端進行讀寫的流量上線)
能夠在broker配置層面上設置配額,會對全部客戶端生效。默認全部客戶端都是用無限制的配額。如下配置可將producer和consumer的配額設爲10MB/s: quta.producer.default=10486750 quta.consumer.default=10485760 也能夠對某客戶端單獨設置配額(用的少,瞭解便可): [root@node106.yinzhengjie.org.cn ~]# kafka-configs.sh --zookeeper node108.yinzhengjie.org.cn:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name ClientA --entity-type clients
6>.kafka重要監控指標(摘自《Kafka權威指南》中的"broker的度量指標")
Kafka使用JMX輸出server和client的各項指標,可使用jconsole等工具展現指標。這裏咱們只羅列server端能夠看到的各項指標及其JMX MBean名稱。咱們會去到對應的度量指標後,可使用zabbix的java gateway插件來監控kafka,固然也可使用open falcon之類的工具進行監控。