kafka 主題管理

對於 kafka 主題(topic)的管理(增刪改查),使用最多的即是kafka自帶的腳本。node

建立主題

kafka提供了自帶的 kafka-topics 腳本,用來幫助用戶建立主題(topic)。shell

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

create 代表咱們要建立主題,而 partitions 和 replication factor 分別設置了主題的分區數以及每一個分區下的副本數。json

這裏爲何用的 --bootstrap-server 參數,而不是 --zookeeper ?

--zookeeper 參數是以前版本的用法,從kafka 2.2 版本開始,社區推薦使用 --bootstrap-server 參數替換 --zoookeeper ,而且顯式地將後者標記爲 「已過時」,所以,若是你已經在使用 2.2 版本了,那麼建立主題請指定 --bootstrap-server 參數。bootstrap

推薦使用 --bootstrap-server 而非 --zookeeper 的緣由主要有兩個。緩存

  1. 使用 --zookeeper 會繞過 Kafka 的安全體系。這就是說,即便你爲 Kafka 集羣設置了安全認證,限制了主題的建立,若是你使用 --zookeeper 的命令,依然能成功建立任意主題,不受認證體系的約束。這顯然是 Kafka 集羣的運維人員不但願看到的。
  2. 使用 --bootstrap-server 與集羣進行交互,愈來愈成爲使用 Kafka 的標準姿式。換句話說,之後會有愈來愈少的命令和 API 須要與 ZooKeeper 進行鏈接。這樣,咱們只須要一套鏈接信息,就能與 Kafka 進行全方位的交互,不用像之前同樣,必須同時維護 ZooKeeper 和 Broker 的鏈接信息。

查詢主題

建立好主題以後,Kafka 容許咱們使用相同的腳本查詢主題。你可使用下面的命令,查詢全部主題的列表。安全

bin/kafka-topics.sh --bootstrap-server broker_host:port --list

若是要查詢單個主題的詳細數據,你可使用下面的命令。運維

bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

若是 describe 命令不指定具體的主題名稱,那麼 Kafka 默認會返回全部 「可見」 主題的詳細數據給你。異步

這裏的 「可見」,是指發起這個命令的用戶可以看到的 Kafka 主題。這和前面說到主題建立時,使用 --zookeeper 和 --bootstrap-server 的區別是同樣的。若是指定了 --bootstrap-server,那麼這條命令就會受到安全認證體系的約束,即對命令發起者進行權限驗證,而後返回它能看到的主題。不然,若是指定 --zookeeper 參數,那麼默認會返回集羣中全部的主題詳細數據。基於這些緣由,我建議你最好統一使用 --bootstrap-server 鏈接參數。spa

修改主題

修改主題分區

其實就是增長分區,目前 Kafka 不容許減小某個主題的分區數。你可使用 kafka-topics 腳本,結合 --alter 參數來增長某個主題的分區數,命令以下:線程

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分區數 >

這裏要注意的是,你指定的分區數必定要比原有分區數大,不然 Kafka 會拋出 InvalidPartitionsException 異常。

修改主題級別參數

在主題建立以後,咱們可使用 kafka-configs 腳本修改對應的參數。

假設咱們要設置主題級別參數 max.message.bytes,那麼命令以下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

也許你會以爲奇怪,爲何這個腳本就要指定 --zookeeper,而不是 --bootstrap-server 呢?其實,這個腳本也能指定 --bootstrap-server 參數,只是它是用來設置動態參數的。在專欄後面,我會詳細介紹什麼是動態參數,以及動態參數都有哪些。如今,你只須要了解設置常規的主題級別參數,仍是使用 --zookeeper。

變動副本數

使用自帶的 kafka-reassign-partitions 腳本,幫助咱們增長主題的副本數。

假設kafka的內部主題 __consumer_offsets 只有 1 個副本,如今咱們想要增長至 3 個副本。下面是操做:

  1. 建立一個 json 文件,顯式提供 50 個分區對應的副本數。注意,replicas 中的 3 臺 Broker 排列順序不一樣,目的是將 Leader 副本均勻地分散在 Broker 上。該文件具體格式以下
{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}
  1. 執行 kafka-reassign-patitions 腳本,命令以下:
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

除了修改內部主題,咱們可能還想查看這些內部主題的消息內容。特別是對於 __consumer_offsets 而言,因爲它保存了消費者組的位移數據,有時候直接查看該主題消息是很方便的事情。下面的命令能夠幫助咱們直接查看消費者組提交的位移數據。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

除了查看位移提交數據,咱們還能夠直接讀取該主題消息,查看消費者組的狀態信息。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

對於內部主題 __transaction_state 而言,方法是相同的。你只須要指定 kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter 便可。

修改主題限速

這裏主要是指設置 Leader 副本和 Follower 副本使用的帶寬。有時候,咱們想要讓某個主題的副本在執行副本同步機制時,不要消耗過多的帶寬。Kafka 提供了這樣的功能。我來舉個例子。假設我有個主題,名爲 test,我想讓該主題各個分區的 Leader 副本和 Follower 副本在處理副本同步時,不得佔用超過 100MBps 的帶寬。注意是大寫 B,即每秒不超過 100MB。那麼,咱們應該怎麼設置呢?

要達到這個目的,咱們必須先設置 Broker 端參數 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令以下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0

這條命令結尾處的 --entity-name 就是 Broker ID。假若該主題的副本分別在 0、一、二、3 多個 Broker 上,那麼你還要依次爲 Broker 一、二、3 執行這條命令。

設置好這個參數以後,咱們還須要爲該主題設置要限速的副本。在這個例子中,咱們想要爲全部副本都設置限速,所以統一使用通配符 * 來表示,命令以下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

主題分區遷移

一樣是使用 kafka-reassign-partitions 腳本,對主題各個分區的副本進行 「手術」 般的調整,好比把某些分區批量遷移到其餘 Broker 上。

刪除主題

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>

刪除主題的命令並不複雜,關鍵是刪除操做是異步的,執行完這條命令不表明主題當即就被刪除了。它僅僅是被標記成 「已刪除」 狀態而已。Kafka 會在後臺默默地開啓主題刪除操做。所以,一般狀況下,你都須要耐心地等待一段時間。

主題刪除失敗

當運行完上面的刪除命令後,不少人發現已刪除主題的分區數據依然 「躺在」 硬盤上,沒有被清除。這時該怎麼辦呢?

實際上,形成主題刪除失敗的緣由有不少,最多見的緣由有兩個:

  • 副本所在的 Broker 宕機了
  • 待刪除主題的部分分區依然在執行遷移過程。

若是是由於前者,一般你重啓對應的 Broker 以後,刪除操做就能自動恢復;若是是由於後者,那就麻煩了,極可能兩個操做會相互干擾。

無論什麼緣由,一旦你碰到主題沒法刪除的問題,能夠採用這樣的方法:

  1. 手動刪除 ZooKeeper 節點 /admin/delete_topics 下以待刪除主題爲名的 znode。

  2. 手動刪除該主題在磁盤上的分區目錄。

  3. 在 ZooKeeper 中執行 rmr /controller,觸發 Controller 重選舉,刷新 Controller 緩存。

在執行最後一步時,你必定要謹慎,由於它可能形成大面積的分區 Leader 重選舉。事實上,僅僅執行前兩步也是能夠的,只是 Controller 緩存中沒有清空待刪除主題罷了,也不影響使用。

常見問題

__consumer_offsets 佔用太多的磁盤

一旦你發現這個主題消耗了過多的磁盤空間,那麼,你必定要顯式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前綴的線程狀態。一般狀況下,這都是由於該線程掛掉了,沒法及時清理此內部主題。假若真是這個緣由致使的,那咱們就只能重啓相應的 Broker 了。另外,請你注意保留出錯日誌,由於這一般都是 Bug 致使的,最好提交到社區看一下。

相關文章
相關標籤/搜索