Kafka提供了一些命令行工具,用於管理集羣變動。這些工具使用Java實現,Kafka提供了一些腳本調用這些Java類。正則表達式
9.1主題操做json
使用Kafka-topics.sh工具能夠執行主題大部分工做,咱們能夠用它建立,修改,刪除和查看集羣的主題。要使用該工具的所有功能,須要經過—zookeeper參數提供zookeeper鏈接字符串。bootstrap
建立主題安全
建立主題須要三個參數:主題名,複製係數和分區負載均衡
例如:建立名爲my-topic的主題,包含8個分區,每一個分區擁有2個副本ide
Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —create my-topic —replicaton-factor 2 —partition 8工具
增長分區編碼
增長分區主要是爲了擴展主題容量或者下降單個分區的吞吐量,若是單個消費者羣組內運行更多消費者,也須要增長分區。命令行
例如:將my-topic主題分區數量增長到16日誌
Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —alter —topic my-topic —partition 16
沒法減小分區數量,若是必定要刪,能夠先刪除主題,而後從新建立
刪除主題
爲了刪除主題,broker的delete.topic.enable必須爲true,若是爲false,刪除主題的請求會被忽略。
Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —delete —topic my-topic
列出集羣全部主題
Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —list
列出全部主題詳細信息
Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —describe
使用—topics-with-overrides參數能夠找到全部包含覆蓋配置的主題,它會列出包含了與集羣不同的配置的主題。
有兩個參數能夠找出有問題的分區。
使用—under-replicated-partitions參數能夠列出全部包含不一樣步副本的分區。
使用—unavailable-partitions參數的能夠列出全部沒有首領的分區。這些分區已經離線,對於生產者和消費者都不可用。
Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —describe —under-replicated-partitions
9.2消費者羣組
Kafka有兩個地方保存消費者羣組信息,舊版本在zookeeper,新版本在broker,Kafka-consumer-groups.sh 能夠列出上述兩種消費者羣組。它能夠用於刪除消費者羣組和偏移量,舊版本須要zookeeper地址,新版本須要broker地址。
列出並描述羣組
示例:列出舊版本消費者羣組
Kafka-consumer-groups.sh —zookeeper zoo1.example.com:2181/kafka-cluster —list
示例:列出新版本消費者羣組
Kafka-consumer-groups.sh —new-consumer —bootstrap-server kafka1.example.com:9092/kafka-cluster —list
對於列出任意羣組來講,使用—describe代替—list,經過—group指定特定羣組就能夠獲取羣組的詳細信息。它會列出羣組全部主題的信息和每一個分區偏移量。
示例:列出舊版本消費者羣組testgroup的詳細信息
Kafka-consumer-groups.sh —new-consumer —bootstrap-server kafka1.example.com:9092/kafka-cluster —describe —group test group
刪除消費者羣組
只有舊版本消費者客戶端支持刪除羣組。刪除羣組將從zookeeper刪除羣組和全部偏移量。執行操做以前,必須關閉全部消費者。
示例:刪除消費者羣組testgroup
Kafka-consumer-groups.sh —zookeeper zoo1.example.com:2181/kafka-cluster —delete —group test group
該命令也能夠用於不刪除整個羣組的狀況下刪除單個主題的偏移量。(刪除以前,必須關閉消費者)
示例:從消費者羣組testgroup刪除my-topic的主題偏移量。
Kafka-consumer-groups.sh —zookeeper zoo1.example.com:2181/kafka-cluster —delete —group test group —topic my-topic
偏移量管理
除了能夠顯示和刪除消費者羣組的偏移量以外,還能夠獲取偏移量,並保存批次的最新偏移量,從而實現偏移量重置。
1.導出偏移量
可使用Kafka-run-class.sh調用底層Java類實現導出,會生成一個文件,包含分區和偏移量信息
示例:將羣組testgroup的偏移量導出到offsets文件
Kafka-run-calss.sh kafka.tools.ExportZkoffsets —zkconnect zoo1.example.com:2181/kafka-cluster —group testgroup —output-file offsets
2.導入偏移量
導入偏移量以前,必須先關閉消費者。
示例:從offsets文件將偏移量導入消費者羣組testgroup
Kafka-run-calss.sh kafka.tools.ImportZkoffsets —zkconnect zoo1.example.com:2181/kafka-cluster —group testgroup —input-file offsets
9.3動態配置變動
咱們能夠在集羣運行狀態時覆蓋主題配置和客戶端配置。不論是在工具仍是文檔,它們所說的動態配置參數都是基於主題或客戶端的,都是能夠被覆蓋的。
覆蓋主題的默認配置
更改主題配置的命令格式以下:
Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster -alter —entitiy-type topics —entity-name <topicname> —add-config key=value[key=value]
示例:將主題my-topic的消息保留時間設爲1小時
Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster -alter —entitiy-type topics —entity-name my-topic —add-config retention.ms=3600000
覆蓋客戶端的默認配置
對於Kafka客戶端來講,只能覆蓋生產者配額和消費者配額。這兩個參數都以字節每秒爲參數,表示客戶端每一個broker上的生成速率和消費速率
更改客戶端配置的命令以下:
Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster —alter —entity-type clients —entity-name <client id>
—add-config key=value
可用的客戶端配置參數,producer_bytes-rate,consumer_bytes_rate
列出被覆蓋的配置
使用命令行參數能夠列出全部被覆蓋的配置,從而用於檢查主題和客戶端配置。示例:列出主題my-topic全部被覆蓋的配置
Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster —describe —entity-type topics —entity-name my-topic
移除被覆蓋的配置
動態的配置徹底能夠被移除,從而恢復到集羣的默認配置。可使用alter命令和—delete-config參數來刪除被覆蓋的配置。
示例:刪除主題my-topic的retention.ms覆蓋配置
Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster
—alter —entity-type topics —entity-name my-topic
—delete-config retention.ms
9.4分區管理
Kafka提供兩個腳本管理分區,一個用於從新選舉首領,一個用於將分區分配給broker,結合這兩個工具,就能夠實現集羣流量的負載均衡。
使用多個分區副本能夠提高可靠性,不過只有其中一個副本能夠成爲分區首領,只有首領所在broker能夠進行生產和消費活動。Kafka將副本清單第一個同步副本選爲首領,但在關閉並重啓broker以後,並不會自動恢復原先的首領身份。broker有一個配置能夠用於啓動自動首領再均衡,不過不建議在生產環境使用該功能。
經過觸發首選的副本選舉,可讓broker從新得到首領。當該事件被觸發時,集羣控制器會爲分區從新選擇理想的首領。也能夠經過Kafka-preferred-replica-election.sh 工具手動觸發選舉。
示例:啓動首選的副本選舉
Kafka-preferred-replica-election.sh —zookeeper zoo1.example.com:2181/kafka-cluster
進行選舉時,集羣的元數據必須被寫到zookeeper上,若是元數據超過了節點運行的大小(默認時1MB),選舉就會失敗。
這個時候須要把分區清單信息寫到json文件,分多個步驟進行:
示例:經過在partitions.json文件裏指定分區清單來啓動副本選舉
Kafka-perferred-replica-election.sh —zookeeper zoo1.example.com:2181/kafka-cluster —path-to-json-file partitons.json
9.4.2修改分區副本
有時須要修改分區副本,如下時須要修改分區副本的場景
1.主題分區在整個集羣的不均衡分佈形成集羣負載不均衡
2.broker離線形成分區不一樣步
3.新加入的broker須要從集羣得到負載。
可使用Kafka-reassign-partitions.sh 工具來修改分區。
第一步:根據broker清單和主題清單生成一組遷移步驟;
第二步,執行遷移步驟;
第三步:可使用生成的遷移步驟驗證分區重分配的進度和完成狀況。
示例:爲topics.json文件裏的主題生成遷移步驟,將這些主題遷移到broker0和broker1上。
Kafka-reassign-partitions.sh —zookeeper zoo1.example.com:2181/kafka-cluster —generate —topics-to-move-json-file topics.json —broker-list 0,1
這個工具會在控制檯輸出兩個json,分別描述裏當前分區和建議分區分配方案,能夠把第一個json保存起來,以備回滾。
第二個json應該被保存到另外一個文件,做爲Kafka-reassign-partitons.sh工具的輸入來執行第二個步驟。
示例:使用reassign.json來執行建議的分區分配方案
Kafka-reassign-partitions.sh —zookeeper zoo1.example.com:2181/kafka-cluster —execute —reassignment-json-file reassign.json
該命令會將指定分區的副本從新分配到新的broker上。
在重分配進行過程當中或完成以後,可使用Kafka-reassign-partitions.sh工具驗證重分配的狀態。
示例:驗證reassign.json文件裏指定的分區重分配狀況。
Kafka-reassign-partitions.sh —zookeeper zoo1.example.com:2181/kafka-cluster —verify —reassignment-json-file reassign.json
9.4.3修改複製係數
分區重分配工具提供了一些特性,能夠改變分區的複製係數
9.4.4轉儲日誌片斷
若是須要查看某個特定消息的內容,可使用工具來解碼分區的日誌片斷。該工具可讓你在不讀取消息的狀況下查看消息的內容。它接受一個以逗號分隔的日誌片斷文件清單做爲參數,並打印出每一個消息的概要信息和數據內容。
示例:解碼日誌片斷0001.log,顯示消息的概要信息
Kafka-run-class.sh kafka.tools.DumpLogSegments —files 0001.log
示例:解碼日誌片斷0001.log,顯示消息的數據內容
Kafka-run-class.sh kafka.tools.DumpLogSegments —files 0001.log —print-data-log
這個片斷也能夠用於檢查日誌片斷的索引文件。有兩個參數能夠用於指定不一樣程度的驗證,—index-sanity-check將會檢查無用的索引。而—verify-index-only將會檢查索引的匹配度,但不會打印出全部索引。
示例:驗證日誌片斷0001.log索引文件的正確性
Kafka-run-class.sh kafka.tools.DumpLogSegments —files 0001.index,0001.log —index-sanity-check
9.4.5副本驗證
分區複製原理與消費者客戶端相似,跟隨者broker按期將上一個偏移量到當前偏移量之間的數據複製到磁盤上。若是複製中止並重啓,它會從上一個檢查點繼續複製,若是以前複製的日誌被刪除,跟隨者不作任何補償。
可使用Kafka-replica-verification.sh工具驗證集羣分區副本的一致性。它會從指定分區副本獲取消息,並檢查全部副本是否具備相同消息。使用正則表達式將待驗證主題的名字傳給他,還須要顯示提供broker地址清單。
示例:對broker1和broker2上以my-開頭的主題副本進行驗證
Kafka-replica-verification.sh —broker-list kafka1.example.com:9092,kafka2.example.com:9092 —topic-white-list ‘my-.*’
9.5消費和生產
若是須要手動讀取和生成消息,能夠藉助Kafka-console-consumer.sh和Kafka-console-producer.sh兩個工具,它們包裝Java客戶端,讓用戶不須要編寫應用程序就能夠與Kafka主題進行交互。
9.5.1控制檯消費者
Kafka-console-consumer.sh 工具提供一種從一個或多個主題讀取消息的方式。消息被打印到標準輸出,消息之間以空行分隔,默認狀況下沒有格式化。
有一些參數時必選的。
第一步:指定是否要使用新版本消費者,並指導Kafka地址。若是使用舊版本消費者,只須要提供—zookeeper參數,後面跟Kafka集羣的鏈接字符串。
第二步:須要指定主題,有三個可選參數分別是
—topic
—whitelist
—blacklist
示例:使用舊版消費者讀取單個主題my-topic
Kafka-console-consumer.sh —zookeeper
zoo1.example.com:2181/Kafka-cluster —topic my-topic
除了基本參數外,也能夠把其它參數傳給控制檯消費者,能夠經過兩種方式:
第一:將配置參數寫到一個文件,而後經過—consumer.config filename指定配置文件
第二:直接在命令行以—consumer-property key=value的格式傳遞一個或多個參數。
控制檯消費者的其餘經常使用配置:
—formatter classname
指定消息格式化器的類名,用於解碼消息,默認是Kafka.tools.DefaultFormatter
—from-beginning
指定從最舊的偏移量讀取數據,不然就從最新偏移量開始讀取
—max-messages num
指定退出以前最多讀取多少消息
—partition num
指定只讀取ID爲num的分區
讀取偏移量主題
有時候咱們須要指定提交的消費者羣組的偏移量少多少,這個能夠經過讓控制檯消費者讀取一個特定__consumer__offsets來實現.
爲了解碼這個主題,須要使用Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter格式化器。
示例:從偏移量主題讀取一個消息
Kafka-console-consumer.sh —zookeeper zoo1.example.com:2181/kafka-cluster —topic __consumer__offsets —formatter ‘kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter’
9.5.2控制檯生產者
Kafka-console-producer.sh工具能夠用於想Kafka主題寫入消息。默認狀況下,一行時一個消息,鍵值以tab分隔,若是沒有tab,那麼鍵爲null
有兩個參數必須指定:
—broker-list參數指定了一個或多個broker,逗號分隔
—topic指定了目標主題
與控制檯消費者同樣,控制檯生產者能夠接受普通生產者的配置參數,經過兩種方式實現:
第一:經過—producer.config file指定配置文件
第二:在命令行—producer-property key=value傳遞一個或多個參數
有不少參數能夠調整行爲:
—key-serializer classname
指定消息鍵的編碼器名
—value-serializer classname
指定消息的value的編碼器名
—compression-codec string
指定壓縮類型
—sync
指定以同步方式生成消息,也就是說,發送下一個消息以前會等待當前消息獲得確認。
9.7不安全的操做
移動集羣控制器
取消分區重分配
移動待刪除的主題
手動刪除主題