主題管理web
建立主題
算法
若是配置了auto.create.topics.enable=true(這也是默認值)這樣當生產者向一個沒有建立的主題發送消息就會自動建立,其分區數量和副本數量也是有默認配置來控制的。json
# 咱們這裏建立一個3個分區每一個分區有2個副本的主題 kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest
--create | 表示創建 |
--zookeeper | 表示ZK地址,能夠傳遞多個,用逗號分隔bootstrap --zookeeper IP:PORT,IP:PORT,IP:PORT/kafka |
--replication-factor | 表示副本數量,這裏的數量是包含Leader副本和Follower副本,副本數量不能超過代理數量 |
--partitions | 表示主題的分區數量,必須傳遞該參數。Kafka的生產者和消費者採用多線程並行對主題的消息進行處理,每一個線程處理一個分區,分區越多吞吐量就會越大,可是分區越多也意味着須要打開更多的文件句柄數量,這樣也會帶來一些開銷。 |
--topic |
表示主題名稱 |
在Zookeeper中能夠看到以下信息服務器
刪除主題網絡
刪除有兩種方式手動和自動多線程
手動方式須要刪除各個節點日誌路徑下的該主題全部分區,而且刪除zookeeper上/brokers/topics和/config/topics下的對應主題節點ide
自動刪除就是經過腳原本完成,同時須要配置服務器配置文件中的delete.topic.enable=true,默認爲false也就是說經過命令刪除主題只會刪除ZK中的節點,日誌文件不會刪除須要手動清理,若是配置爲true,則會自動刪除日誌文件。工具
kafka-topics.sh --delete --zookeeper 172.16.48.171:2181/kafka --topic KafkaTest
下面的兩句話就是說該主題標記爲刪除/admin/delete_topics節點下。實際數據沒有影響由於該參數沒有設置爲true。
查看主題
# 列出全部主題 kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka
下面是從ZK中看到的全部主題
# 查看全部主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka
# 查看特定主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBB
Replicas:是AR列表,表示副本分佈在哪些代理上,且該列表第一個元素就是Leader副本所在代理
ISR:該列表是顯示已經同步的副本集合,這個列表的副本都是存活的
# 經過--describe 和 --under-replicated-partitions 能夠查看正在同步的主題或者同步可能發生異常, # 也就是ISR列表長度小於AR列表,若是一切正常則不會返回任何東西,也能夠經過 --tipic 指定具體主題 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions
# 查看哪些主題創建時使用了單獨的配置 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides
這裏只有一個內部主題__comsumer_offsets使用了非配置文件中的設置
配置管理
所謂配置就是參數,好比修改主題的默認參數。
主題級別的
# 查看配置 kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB
這裏顯示 Configs for topic 'BBB' are 表示它的配置有哪些,這裏沒有表示沒有爲該主題單獨設置配置,都是使用的默認配置。
# 增長一個配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2
若是修改的話仍是相同的命令,只是把值修改一下
# 刪除配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages
客戶端級別
這個主要是設置流控
# 設置指定消費者的流控 --entity-name 是客戶端在建立生產者或者消費者時是指定的client.id名稱 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME
下圖爲ZK中對應的信息
分區管理
分區平衡
Leader副本在集羣中應該是均衡分佈,由於Leader副本對外提供讀寫服務,儘量不讓同一個主題的多個Leader副本在同一個代理上,可是隨着時間推移好比故障轉移等狀況發送,Leader副本可能不均衡。有兩種方式設置自動平衡,自動和手動。
自動就是在配置文件中增長 auto.leader.rebalance.
enable
=
true 若是該項爲false,當某個節點故障恢復並從新上線後,它原來的Leader副本也不會轉移回來,只是一個Follower副本。
手動就是經過命令來執行
kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka
分區遷移
當下線一個節點須要將該節點上的分區副本遷移到其餘可用節點上,Kafka並不會自動進行分區遷移,若是不遷移就會致使某些主題數據丟失和不可用的狀況。當增長新節點時,只有新建立的主題纔會分配到新節點上,以前的主題分區不會自動分配到新節點上,由於老的分區在建立時AR列表中沒有這個新節點。
上面2個主題,每一個主題3個分區,每一個分區3個副本,咱們假設如今代理2要下線,因此咱們要把代理2上的這兩個主題的分區數據遷移出來。
# 1. 在KAFKA目錄的config目錄中創建topics-to-move.json文件 { "topics":[ { "topic":"AAA" }, { "topic":"BBB" } ], "version":1 }
# 2. 生成分區分配方案,只是生成一個方案信息而後輸出 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate
這個命令的原理是從zookeeper中讀取主題元數據信息及制定的有效代理,根據分區副本分配算法從新計算指定主題的分區副本分配方案。把【Proposed partition reassignment configuration】下面的分區方案保存到一個JSON文件中,partitions-reassignment.json 文件名無所謂。
# 3. 執行方案 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute
# 4. 查看進度 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify
查看結果,這裏已經沒有代理0了。
集羣擴容
上面演示了節點下線的數據遷移,這裏演示一下集羣擴容的數據遷移。咱們仍是用上面兩個主題,假設代理0又從新上線了。其實擴容就是上面的反向操做
# 1. 創建JSON文件 # 該文件和以前的相同
# 2. 生成方案並保存到一個JSON文件中 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate
# 3. 數據遷移,這裏經過--throttle作一個限流操做,若是數據過大會把網絡堵塞。 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024
查看進度和結果
增長分區
一般在須要提供吞吐量的時候咱們會增長分區,而後若是代理數量不擴大,同時生產者和消費者線程不增大,你擴展了分區也沒有用。
kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03
增長副本
集羣規模擴大而且想對全部主題或者指定主題提升可用性,那麼能夠增長原有主題的副本數量
上面是3個分區,每一個分區1個副本,咱們如今把每一個分區擴展爲3個副本
# 1. 建立JSON文件 replica-extends.json { "version": 1, "partitions": [{ "topic": "KafkaTest04", "partition": 0, "replicas": [0,1,2] }, { "topic": "KafkaTest04", "partition": 1, "replicas": [0,1,2] }, { "topic": "KafkaTest04", "partition": 2, "replicas": [0,1,2] } ] }
# 2. 執行分區副本從新分配命令 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute
查看狀態
查看結果
鏡像操做
Kafka有一個鏡像工具kafka-mirror-maker.sh,用於將一個集羣數據同步到另一個集羣中,這個很是有用,好比機房搬遷就須要進行數據同步。該工具的本質就是建立一個消費者,在源集羣中須要遷移的主題消費數據,而後建立一個生產者,將消費的數據寫入到目標集羣中。
首先建立消費者配置文件mirror-consumer.properties(文件路徑和名稱是自定義的)
# 源kafka集羣代理地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092 # 消費者組名 group.id=mirror
其次建立生產者配置文件mirror-producer.properties(文件路徑和名稱是自定義的)
# 目標kafka集羣地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092
運行鏡像命令
# 經過 --whitelist 指定須要鏡像的主題,經過 --blacklist 指定不須要鏡像的主題 kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC
因爲鏡像操做是啓動一個生產者和消費者,因此數據同步完成後這個生產者和消費者並不會關閉,它會依然等待新數據,因此同步完成之後你須要本身查看,確認完成了則關閉生產者和消費者。