對於 kafka 主題(topic)的管理(增刪改查),使用最多的即是kafka自帶的腳本。node
建立主題
kafka提供了自帶的 kafka-topics
腳本,用來幫助用戶建立主題(topic)。shell
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
create 代表咱們要建立主題,而 partitions 和 replication factor 分別設置了主題的分區數以及每一個分區下的副本數。json
這裏爲何用的 --bootstrap-server
參數,而不是 --zookeeper
?
--zookeeper
參數是以前版本的用法,從kafka 2.2 版本開始,社區推薦使用 --bootstrap-server
參數替換 --zoookeeper
,而且顯式地將後者標記爲 「已過時」,所以,若是你已經在使用 2.2 版本了,那麼建立主題請指定 --bootstrap-server
參數。bootstrap
推薦使用 --bootstrap-server
而非 --zookeeper
的緣由主要有兩個。緩存
- 使用 --zookeeper 會繞過 Kafka 的安全體系。這就是說,即便你爲 Kafka 集羣設置了安全認證,限制了主題的建立,若是你使用 --zookeeper 的命令,依然能成功建立任意主題,不受認證體系的約束。這顯然是 Kafka 集羣的運維人員不但願看到的。
- 使用 --bootstrap-server 與集羣進行交互,愈來愈成爲使用 Kafka 的標準姿式。換句話說,之後會有愈來愈少的命令和 API 須要與 ZooKeeper 進行鏈接。這樣,咱們只須要一套鏈接信息,就能與 Kafka 進行全方位的交互,不用像之前同樣,必須同時維護 ZooKeeper 和 Broker 的鏈接信息。
查詢主題
建立好主題以後,Kafka 容許咱們使用相同的腳本查詢主題。你可使用下面的命令,查詢全部主題的列表。安全
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
若是要查詢單個主題的詳細數據,你可使用下面的命令。運維
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
若是 describe 命令不指定具體的主題名稱,那麼 Kafka 默認會返回全部 「可見」 主題的詳細數據給你。異步
這裏的 「可見」,是指發起這個命令的用戶可以看到的 Kafka 主題。這和前面說到主題建立時,使用 --zookeeper 和 --bootstrap-server 的區別是同樣的。若是指定了 --bootstrap-server,那麼這條命令就會受到安全認證體系的約束,即對命令發起者進行權限驗證,而後返回它能看到的主題。不然,若是指定 --zookeeper 參數,那麼默認會返回集羣中全部的主題詳細數據。基於這些緣由,我建議你最好統一使用 --bootstrap-server 鏈接參數。spa
修改主題
修改主題分區
其實就是增長分區,目前 Kafka 不容許減小某個主題的分區數。你可使用 kafka-topics 腳本,結合 --alter 參數來增長某個主題的分區數,命令以下:線程
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分區數 >
這裏要注意的是,你指定的分區數必定要比原有分區數大,不然 Kafka 會拋出 InvalidPartitionsException 異常。
修改主題級別參數
在主題建立以後,咱們可使用 kafka-configs 腳本修改對應的參數。
假設咱們要設置主題級別參數 max.message.bytes,那麼命令以下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
也許你會以爲奇怪,爲何這個腳本就要指定 --zookeeper,而不是 --bootstrap-server 呢?其實,這個腳本也能指定 --bootstrap-server 參數,只是它是用來設置動態參數的。在專欄後面,我會詳細介紹什麼是動態參數,以及動態參數都有哪些。如今,你只須要了解設置常規的主題級別參數,仍是使用 --zookeeper。
變動副本數
使用自帶的 kafka-reassign-partitions 腳本,幫助咱們增長主題的副本數。
假設kafka的內部主題 __consumer_offsets
只有 1 個副本,如今咱們想要增長至 3 個副本。下面是操做:
- 建立一個 json 文件,顯式提供 50 個分區對應的副本數。注意,replicas 中的 3 臺 Broker 排列順序不一樣,目的是將 Leader 副本均勻地分散在 Broker 上。該文件具體格式以下
{"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]}, {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]}, ... {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]} ]}
- 執行
kafka-reassign-patitions
腳本,命令以下:
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
除了修改內部主題,咱們可能還想查看這些內部主題的消息內容。特別是對於 __consumer_offsets 而言,因爲它保存了消費者組的位移數據,有時候直接查看該主題消息是很方便的事情。下面的命令能夠幫助咱們直接查看消費者組提交的位移數據。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
除了查看位移提交數據,咱們還能夠直接讀取該主題消息,查看消費者組的狀態信息。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
對於內部主題 __transaction_state 而言,方法是相同的。你只須要指定 kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter 便可。
修改主題限速
這裏主要是指設置 Leader 副本和 Follower 副本使用的帶寬。有時候,咱們想要讓某個主題的副本在執行副本同步機制時,不要消耗過多的帶寬。Kafka 提供了這樣的功能。我來舉個例子。假設我有個主題,名爲 test,我想讓該主題各個分區的 Leader 副本和 Follower 副本在處理副本同步時,不得佔用超過 100MBps 的帶寬。注意是大寫 B,即每秒不超過 100MB。那麼,咱們應該怎麼設置呢?
要達到這個目的,咱們必須先設置 Broker 端參數 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令以下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
這條命令結尾處的 --entity-name 就是 Broker ID。假若該主題的副本分別在 0、一、二、3 多個 Broker 上,那麼你還要依次爲 Broker 一、二、3 執行這條命令。
設置好這個參數以後,咱們還須要爲該主題設置要限速的副本。在這個例子中,咱們想要爲全部副本都設置限速,所以統一使用通配符 * 來表示,命令以下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
主題分區遷移
一樣是使用 kafka-reassign-partitions 腳本,對主題各個分區的副本進行 「手術」 般的調整,好比把某些分區批量遷移到其餘 Broker 上。
刪除主題
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
刪除主題的命令並不複雜,關鍵是刪除操做是異步的,執行完這條命令不表明主題當即就被刪除了。它僅僅是被標記成 「已刪除」 狀態而已。Kafka 會在後臺默默地開啓主題刪除操做。所以,一般狀況下,你都須要耐心地等待一段時間。
主題刪除失敗
當運行完上面的刪除命令後,不少人發現已刪除主題的分區數據依然 「躺在」 硬盤上,沒有被清除。這時該怎麼辦呢?
實際上,形成主題刪除失敗的緣由有不少,最多見的緣由有兩個:
- 副本所在的 Broker 宕機了
- 待刪除主題的部分分區依然在執行遷移過程。
若是是由於前者,一般你重啓對應的 Broker 以後,刪除操做就能自動恢復;若是是由於後者,那就麻煩了,極可能兩個操做會相互干擾。
無論什麼緣由,一旦你碰到主題沒法刪除的問題,能夠採用這樣的方法:
-
手動刪除 ZooKeeper 節點 /admin/delete_topics 下以待刪除主題爲名的 znode。
-
手動刪除該主題在磁盤上的分區目錄。
-
在 ZooKeeper 中執行 rmr /controller,觸發 Controller 重選舉,刷新 Controller 緩存。
在執行最後一步時,你必定要謹慎,由於它可能形成大面積的分區 Leader 重選舉。事實上,僅僅執行前兩步也是能夠的,只是 Controller 緩存中沒有清空待刪除主題罷了,也不影響使用。
常見問題
__consumer_offsets 佔用太多的磁盤
一旦你發現這個主題消耗了過多的磁盤空間,那麼,你必定要顯式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前綴的線程狀態。一般狀況下,這都是由於該線程掛掉了,沒法及時清理此內部主題。假若真是這個緣由致使的,那咱們就只能重啓相應的 Broker 了。另外,請你注意保留出錯日誌,由於這一般都是 Bug 致使的,最好提交到社區看一下。