Kafka(六)Kafka基本客戶端命令操做


主題管理web

建立主題
算法

若是配置了auto.create.topics.enable=true(這也是默認值)這樣當生產者向一個沒有建立的主題發送消息就會自動建立,其分區數量和副本數量也是有默認配置來控制的。json

# 咱們這裏建立一個3個分區每一個分區有2個副本的主題
kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest
--create 表示創建
--zookeeper

表示ZK地址,能夠傳遞多個,用逗號分隔bootstrap

--zookeeper IP:PORT,IP:PORT,IP:PORT/kafka
bash

--replication-factor 表示副本數量,這裏的數量是包含Leader副本和Follower副本,副本數量不能超過代理數量
--partitions 表示主題的分區數量,必須傳遞該參數。Kafka的生產者和消費者採用多線程並行對主題的消息進行處理,每一個線程處理一個分區,分區越多吞吐量就會越大,可是分區越多也意味着須要打開更多的文件句柄數量,這樣也會帶來一些開銷。
--topic
表示主題名稱

image.png

在Zookeeper中能夠看到以下信息服務器

image.png

刪除主題網絡

刪除有兩種方式手動和自動多線程

  • 手動方式須要刪除各個節點日誌路徑下的該主題全部分區,而且刪除zookeeper上/brokers/topics和/config/topics下的對應主題節點ide

  • 自動刪除就是經過腳原本完成,同時須要配置服務器配置文件中的delete.topic.enable=true,默認爲false也就是說經過命令刪除主題只會刪除ZK中的節點,日誌文件不會刪除須要手動清理,若是配置爲true,則會自動刪除日誌文件。工具

kafka-topics.sh --delete --zookeeper 172.16.48.171:2181/kafka --topic KafkaTest

image.png

下面的兩句話就是說該主題標記爲刪除/admin/delete_topics節點下。實際數據沒有影響由於該參數沒有設置爲true。

查看主題

# 列出全部主題
kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka

image.png

下面是從ZK中看到的全部主題

image.png

# 查看全部主題信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka

image.png

# 查看特定主題信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBB

image.png

Replicas:是AR列表,表示副本分佈在哪些代理上,且該列表第一個元素就是Leader副本所在代理

ISR:該列表是顯示已經同步的副本集合,這個列表的副本都是存活的

# 經過--describe 和 --under-replicated-partitions 能夠查看正在同步的主題或者同步可能發生異常,
# 也就是ISR列表長度小於AR列表,若是一切正常則不會返回任何東西,也能夠經過 --tipic 指定具體主題
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions
# 查看哪些主題創建時使用了單獨的配置 
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides

這裏只有一個內部主題__comsumer_offsets使用了非配置文件中的設置

image.png


配置管理

所謂配置就是參數,好比修改主題的默認參數。

主題級別的

# 查看配置
kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB

這裏顯示 Configs for topic 'BBB' are 表示它的配置有哪些,這裏沒有表示沒有爲該主題單獨設置配置,都是使用的默認配置。

image.png

# 增長一個配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2

image.png

若是修改的話仍是相同的命令,只是把值修改一下

image.png

# 刪除配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages

image.png

客戶端級別

這個主要是設置流控

# 設置指定消費者的流控 --entity-name 是客戶端在建立生產者或者消費者時是指定的client.id名稱
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME

image.png

下圖爲ZK中對應的信息

image.png


分區管理

分區平衡

Leader副本在集羣中應該是均衡分佈,由於Leader副本對外提供讀寫服務,儘量不讓同一個主題的多個Leader副本在同一個代理上,可是隨着時間推移好比故障轉移等狀況發送,Leader副本可能不均衡。有兩種方式設置自動平衡,自動和手動。

自動就是在配置文件中增長 auto.leader.rebalance.enable true 若是該項爲false,當某個節點故障恢復並從新上線後,它原來的Leader副本也不會轉移回來,只是一個Follower副本。

手動就是經過命令來執行

kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka

分區遷移

當下線一個節點須要將該節點上的分區副本遷移到其餘可用節點上,Kafka並不會自動進行分區遷移,若是不遷移就會致使某些主題數據丟失和不可用的狀況。當增長新節點時,只有新建立的主題纔會分配到新節點上,以前的主題分區不會自動分配到新節點上,由於老的分區在建立時AR列表中沒有這個新節點。

image.png

上面2個主題,每一個主題3個分區,每一個分區3個副本,咱們假設如今代理2要下線,因此咱們要把代理2上的這兩個主題的分區數據遷移出來。

# 1. 在KAFKA目錄的config目錄中創建topics-to-move.json文件
{
    "topics":[
        {
            "topic":"AAA"
        },
        {
            "topic":"BBB"
        }
    ],
    "version":1
}

# 2. 生成分區分配方案,只是生成一個方案信息而後輸出
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate

image.png

個命令的原理是從zookeeper中讀取主題元數據信息及制定的有效代理,根據分區副本分配算法從新計算指定主題的分區副本分配方案。把【Proposed partition reassignment configuration】下面的分區方案保存到一個JSON文件中,partitions-reassignment.json 文件名無所謂。

# 3. 執行方案
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute

image.png

# 4. 查看進度
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify

image.png

查看結果,這裏已經沒有代理0了。

image.png

集羣擴容

上面演示了節點下線的數據遷移,這裏演示一下集羣擴容的數據遷移。咱們仍是用上面兩個主題,假設代理0又從新上線了。其實擴容就是上面的反向操做

# 1. 創建JSON文件
# 該文件和以前的相同


# 2. 生成方案並保存到一個JSON文件中
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate

image.png

# 3. 數據遷移,這裏經過--throttle作一個限流操做,若是數據過大會把網絡堵塞。
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024

image.png

查看進度和結果

image.png

增長分區

一般在須要提供吞吐量的時候咱們會增長分區,而後若是代理數量不擴大,同時生產者和消費者線程不增大,你擴展了分區也沒有用。

image.png

kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03

image.png

增長副本

集羣規模擴大而且想對全部主題或者指定主題提升可用性,那麼能夠增長原有主題的副本數量

image.png

上面是3個分區,每一個分區1個副本,咱們如今把每一個分區擴展爲3個副本

# 1. 建立JSON文件 replica-extends.json 
{
	"version": 1,
	"partitions": [{
			"topic": "KafkaTest04",
			"partition": 0,
			"replicas": [0,1,2]
		},
		{
			"topic": "KafkaTest04",
			"partition": 1,
			"replicas": [0,1,2]
		},
		{
			"topic": "KafkaTest04",
			"partition": 2,
			"replicas": [0,1,2]
		}
	]
}
# 2. 執行分區副本從新分配命令
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute

image.png

查看狀態

image.png

查看結果

image.png


鏡像操做

Kafka有一個鏡像工具kafka-mirror-maker.sh,用於將一個集羣數據同步到另一個集羣中,這個很是有用,好比機房搬遷就須要進行數據同步。該工具的本質就是建立一個消費者,在源集羣中須要遷移的主題消費數據,而後建立一個生產者,將消費的數據寫入到目標集羣中。

首先建立消費者配置文件mirror-consumer.properties(文件路徑和名稱是自定義的)

# 源kafka集羣代理地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092
# 消費者組名
group.id=mirror

其次建立生產者配置文件mirror-producer.properties(文件路徑和名稱是自定義的)

# 目標kafka集羣地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

運行鏡像命令

# 經過 --whitelist 指定須要鏡像的主題,經過  --blacklist 指定不須要鏡像的主題
kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC

因爲鏡像操做是啓動一個生產者和消費者,因此數據同步完成後這個生產者和消費者並不會關閉,它會依然等待新數據,因此同步完成之後你須要本身查看,確認完成了則關閉生產者和消費者。

相關文章
相關標籤/搜索