Kafka筆記7

Kafka提供了一些命令行工具,用於管理集羣變動。這些工具使用Java實現,Kafka提供了一些腳本調用這些Java類。正則表達式

9.1主題操做json

使用Kafka-topics.sh工具能夠執行主題大部分工做,咱們能夠用它建立,修改,刪除和查看集羣的主題。要使用該工具的所有功能,須要經過—zookeeper參數提供zookeeper鏈接字符串。bootstrap

建立主題安全

建立主題須要三個參數:主題名,複製係數和分區負載均衡

例如:建立名爲my-topic的主題,包含8個分區,每一個分區擁有2個副本ide

Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —create my-topic —replicaton-factor 2 —partition 8工具

增長分區編碼

增長分區主要是爲了擴展主題容量或者下降單個分區的吞吐量,若是單個消費者羣組內運行更多消費者,也須要增長分區。命令行

例如:將my-topic主題分區數量增長到16日誌

Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —alter —topic my-topic —partition 16

沒法減小分區數量,若是必定要刪,能夠先刪除主題,而後從新建立

刪除主題

爲了刪除主題,broker的delete.topic.enable必須爲true,若是爲false,刪除主題的請求會被忽略。

Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —delete —topic my-topic

列出集羣全部主題

Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —list

列出全部主題詳細信息

Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —describe

使用—topics-with-overrides參數能夠找到全部包含覆蓋配置的主題,它會列出包含了與集羣不同的配置的主題。

有兩個參數能夠找出有問題的分區。

使用—under-replicated-partitions參數能夠列出全部包含不一樣步副本的分區。

使用—unavailable-partitions參數的能夠列出全部沒有首領的分區。這些分區已經離線,對於生產者和消費者都不可用。

Kafka-topics.sh —zookeeper zoo1.example.com:2181/kafka-cluster —describe —under-replicated-partitions

9.2消費者羣組

Kafka有兩個地方保存消費者羣組信息,舊版本在zookeeper,新版本在broker,Kafka-consumer-groups.sh 能夠列出上述兩種消費者羣組。它能夠用於刪除消費者羣組和偏移量,舊版本須要zookeeper地址,新版本須要broker地址。

列出並描述羣組

示例:列出舊版本消費者羣組

Kafka-consumer-groups.sh —zookeeper zoo1.example.com:2181/kafka-cluster —list

示例:列出新版本消費者羣組

Kafka-consumer-groups.sh —new-consumer —bootstrap-server kafka1.example.com:9092/kafka-cluster —list

對於列出任意羣組來講,使用—describe代替—list,經過—group指定特定羣組就能夠獲取羣組的詳細信息。它會列出羣組全部主題的信息和每一個分區偏移量。

示例:列出舊版本消費者羣組testgroup的詳細信息

Kafka-consumer-groups.sh —new-consumer —bootstrap-server kafka1.example.com:9092/kafka-cluster —describe —group test group

刪除消費者羣組

只有舊版本消費者客戶端支持刪除羣組。刪除羣組將從zookeeper刪除羣組和全部偏移量。執行操做以前,必須關閉全部消費者。

示例:刪除消費者羣組testgroup

Kafka-consumer-groups.sh —zookeeper zoo1.example.com:2181/kafka-cluster —delete —group test group

該命令也能夠用於不刪除整個羣組的狀況下刪除單個主題的偏移量。(刪除以前,必須關閉消費者)

示例:從消費者羣組testgroup刪除my-topic的主題偏移量。

Kafka-consumer-groups.sh —zookeeper zoo1.example.com:2181/kafka-cluster —delete —group test group —topic my-topic

偏移量管理

除了能夠顯示和刪除消費者羣組的偏移量以外,還能夠獲取偏移量,並保存批次的最新偏移量,從而實現偏移量重置。

1.導出偏移量

可使用Kafka-run-class.sh調用底層Java類實現導出,會生成一個文件,包含分區和偏移量信息

示例:將羣組testgroup的偏移量導出到offsets文件

Kafka-run-calss.sh kafka.tools.ExportZkoffsets —zkconnect zoo1.example.com:2181/kafka-cluster —group testgroup —output-file offsets

2.導入偏移量

導入偏移量以前,必須先關閉消費者。

示例:從offsets文件將偏移量導入消費者羣組testgroup

Kafka-run-calss.sh kafka.tools.ImportZkoffsets —zkconnect zoo1.example.com:2181/kafka-cluster —group testgroup —input-file offsets

 

9.3動態配置變動

咱們能夠在集羣運行狀態時覆蓋主題配置和客戶端配置。不論是在工具仍是文檔,它們所說的動態配置參數都是基於主題或客戶端的,都是能夠被覆蓋的。

覆蓋主題的默認配置

更改主題配置的命令格式以下:

Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster -alter —entitiy-type topics —entity-name <topicname> —add-config key=value[key=value]

示例:將主題my-topic的消息保留時間設爲1小時

Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster -alter —entitiy-type topics —entity-name my-topic —add-config retention.ms=3600000

覆蓋客戶端的默認配置

對於Kafka客戶端來講,只能覆蓋生產者配額和消費者配額。這兩個參數都以字節每秒爲參數,表示客戶端每一個broker上的生成速率和消費速率

更改客戶端配置的命令以下:

Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster —alter —entity-type clients —entity-name <client id> 

—add-config key=value

可用的客戶端配置參數,producer_bytes-rate,consumer_bytes_rate

列出被覆蓋的配置

使用命令行參數能夠列出全部被覆蓋的配置,從而用於檢查主題和客戶端配置。示例:列出主題my-topic全部被覆蓋的配置

Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster —describe —entity-type topics —entity-name my-topic

移除被覆蓋的配置

動態的配置徹底能夠被移除,從而恢復到集羣的默認配置。可使用alter命令和—delete-config參數來刪除被覆蓋的配置。

示例:刪除主題my-topic的retention.ms覆蓋配置

Kafka-configs.sh —zookeeper zoo1.example.com:2181/kafka-cluster

—alter —entity-type topics —entity-name my-topic

—delete-config retention.ms

9.4分區管理

Kafka提供兩個腳本管理分區,一個用於從新選舉首領,一個用於將分區分配給broker,結合這兩個工具,就能夠實現集羣流量的負載均衡。

使用多個分區副本能夠提高可靠性,不過只有其中一個副本能夠成爲分區首領,只有首領所在broker能夠進行生產和消費活動。Kafka將副本清單第一個同步副本選爲首領,但在關閉並重啓broker以後,並不會自動恢復原先的首領身份。broker有一個配置能夠用於啓動自動首領再均衡,不過不建議在生產環境使用該功能。

經過觸發首選的副本選舉,可讓broker從新得到首領。當該事件被觸發時,集羣控制器會爲分區從新選擇理想的首領。也能夠經過Kafka-preferred-replica-election.sh 工具手動觸發選舉。

示例:啓動首選的副本選舉

Kafka-preferred-replica-election.sh —zookeeper zoo1.example.com:2181/kafka-cluster

進行選舉時,集羣的元數據必須被寫到zookeeper上,若是元數據超過了節點運行的大小(默認時1MB),選舉就會失敗。

這個時候須要把分區清單信息寫到json文件,分多個步驟進行:

示例:經過在partitions.json文件裏指定分區清單來啓動副本選舉

Kafka-perferred-replica-election.sh —zookeeper zoo1.example.com:2181/kafka-cluster —path-to-json-file partitons.json

9.4.2修改分區副本

有時須要修改分區副本,如下時須要修改分區副本的場景

1.主題分區在整個集羣的不均衡分佈形成集羣負載不均衡

2.broker離線形成分區不一樣步

3.新加入的broker須要從集羣得到負載。

可使用Kafka-reassign-partitions.sh 工具來修改分區。

第一步:根據broker清單和主題清單生成一組遷移步驟;

第二步,執行遷移步驟;

第三步:可使用生成的遷移步驟驗證分區重分配的進度和完成狀況。

示例:爲topics.json文件裏的主題生成遷移步驟,將這些主題遷移到broker0和broker1上。

Kafka-reassign-partitions.sh —zookeeper zoo1.example.com:2181/kafka-cluster —generate —topics-to-move-json-file topics.json —broker-list 0,1

這個工具會在控制檯輸出兩個json,分別描述裏當前分區和建議分區分配方案,能夠把第一個json保存起來,以備回滾。

第二個json應該被保存到另外一個文件,做爲Kafka-reassign-partitons.sh工具的輸入來執行第二個步驟。

示例:使用reassign.json來執行建議的分區分配方案

Kafka-reassign-partitions.sh —zookeeper zoo1.example.com:2181/kafka-cluster —execute —reassignment-json-file reassign.json

該命令會將指定分區的副本從新分配到新的broker上。

在重分配進行過程當中或完成以後,可使用Kafka-reassign-partitions.sh工具驗證重分配的狀態。

示例:驗證reassign.json文件裏指定的分區重分配狀況。

Kafka-reassign-partitions.sh —zookeeper zoo1.example.com:2181/kafka-cluster —verify —reassignment-json-file reassign.json

9.4.3修改複製係數

分區重分配工具提供了一些特性,能夠改變分區的複製係數

9.4.4轉儲日誌片斷

若是須要查看某個特定消息的內容,可使用工具來解碼分區的日誌片斷。該工具可讓你在不讀取消息的狀況下查看消息的內容。它接受一個以逗號分隔的日誌片斷文件清單做爲參數,並打印出每一個消息的概要信息和數據內容。

示例:解碼日誌片斷0001.log,顯示消息的概要信息

Kafka-run-class.sh kafka.tools.DumpLogSegments —files 0001.log

示例:解碼日誌片斷0001.log,顯示消息的數據內容

Kafka-run-class.sh kafka.tools.DumpLogSegments —files 0001.log —print-data-log

這個片斷也能夠用於檢查日誌片斷的索引文件。有兩個參數能夠用於指定不一樣程度的驗證,—index-sanity-check將會檢查無用的索引。而—verify-index-only將會檢查索引的匹配度,但不會打印出全部索引。

示例:驗證日誌片斷0001.log索引文件的正確性

Kafka-run-class.sh kafka.tools.DumpLogSegments —files 0001.index,0001.log —index-sanity-check

9.4.5副本驗證

分區複製原理與消費者客戶端相似,跟隨者broker按期將上一個偏移量到當前偏移量之間的數據複製到磁盤上。若是複製中止並重啓,它會從上一個檢查點繼續複製,若是以前複製的日誌被刪除,跟隨者不作任何補償。

可使用Kafka-replica-verification.sh工具驗證集羣分區副本的一致性。它會從指定分區副本獲取消息,並檢查全部副本是否具備相同消息。使用正則表達式將待驗證主題的名字傳給他,還須要顯示提供broker地址清單。

示例:對broker1和broker2上以my-開頭的主題副本進行驗證

Kafka-replica-verification.sh —broker-list kafka1.example.com:9092,kafka2.example.com:9092 —topic-white-list ‘my-.*’

9.5消費和生產

若是須要手動讀取和生成消息,能夠藉助Kafka-console-consumer.sh和Kafka-console-producer.sh兩個工具,它們包裝Java客戶端,讓用戶不須要編寫應用程序就能夠與Kafka主題進行交互。

9.5.1控制檯消費者

Kafka-console-consumer.sh 工具提供一種從一個或多個主題讀取消息的方式。消息被打印到標準輸出,消息之間以空行分隔,默認狀況下沒有格式化。

有一些參數時必選的。

第一步:指定是否要使用新版本消費者,並指導Kafka地址。若是使用舊版本消費者,只須要提供—zookeeper參數,後面跟Kafka集羣的鏈接字符串。

第二步:須要指定主題,有三個可選參數分別是

—topic

—whitelist

—blacklist

示例:使用舊版消費者讀取單個主題my-topic

Kafka-console-consumer.sh —zookeeper

zoo1.example.com:2181/Kafka-cluster —topic my-topic

除了基本參數外,也能夠把其它參數傳給控制檯消費者,能夠經過兩種方式:

第一:將配置參數寫到一個文件,而後經過—consumer.config filename指定配置文件

第二:直接在命令行以—consumer-property key=value的格式傳遞一個或多個參數。

控制檯消費者的其餘經常使用配置:

—formatter classname

指定消息格式化器的類名,用於解碼消息,默認是Kafka.tools.DefaultFormatter

—from-beginning

指定從最舊的偏移量讀取數據,不然就從最新偏移量開始讀取

—max-messages num 

指定退出以前最多讀取多少消息

—partition num

指定只讀取ID爲num的分區

讀取偏移量主題

有時候咱們須要指定提交的消費者羣組的偏移量少多少,這個能夠經過讓控制檯消費者讀取一個特定__consumer__offsets來實現.

爲了解碼這個主題,須要使用Kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter格式化器。

示例:從偏移量主題讀取一個消息

Kafka-console-consumer.sh —zookeeper zoo1.example.com:2181/kafka-cluster —topic __consumer__offsets —formatter ‘kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter’

9.5.2控制檯生產者

Kafka-console-producer.sh工具能夠用於想Kafka主題寫入消息。默認狀況下,一行時一個消息,鍵值以tab分隔,若是沒有tab,那麼鍵爲null

有兩個參數必須指定:

—broker-list參數指定了一個或多個broker,逗號分隔

—topic指定了目標主題

與控制檯消費者同樣,控制檯生產者能夠接受普通生產者的配置參數,經過兩種方式實現:

第一:經過—producer.config file指定配置文件

第二:在命令行—producer-property key=value傳遞一個或多個參數

有不少參數能夠調整行爲:

—key-serializer classname

指定消息鍵的編碼器名

—value-serializer classname 

指定消息的value的編碼器名

—compression-codec string

指定壓縮類型

—sync

指定以同步方式生成消息,也就是說,發送下一個消息以前會等待當前消息獲得確認。

9.7不安全的操做

移動集羣控制器

取消分區重分配

移動待刪除的主題

手動刪除主題

相關文章
相關標籤/搜索