目錄apache
@(博客文章)[kafka|大數據]編程
本系統文章共三篇,分別爲
一、kafka集羣原理介紹瞭如下幾個方面的內容:
(1)kafka基礎理論
(2)參數配置
(3)錯誤處理
(4)kafka集羣在zookeeper集羣中的內容json
二、kafka集羣操做介紹了kafka集羣的安裝與操做
(1)單機版安裝
(2)集羣安裝
(3)集羣啓停操做
(4)topic相關操做
(5)某個broker掛掉,重啓本機器
(6)某個broker掛掉且沒法重啓,使用其它機器代替
(7)擴容
(8)數據遷移
(9)機器下線
(10)增長副本數量
(11)平衡leader服務器
三、kafka集羣編程介紹了...併發
此部分不可用於生產,但新接觸kafka時,能夠先有個感性的認識分佈式
Step 1: 下載Kafkaoop
下載最新的版本並解壓.大數據
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz $ tar -zxvf kafka_2.10-0.8.2.1.tgz
Step 2: 啓動服務
Kafka用到了Zookeeper,全部首先啓動Zookper,下面簡單的啓用一個單實例的Zookkeeper服務。能夠在命令的結尾加個&符號,這樣就能夠啓動後離開控制檯。this
> bin/zookeeper-server-start.sh config/zookeeper.properties &
如今啓動Kafka:編碼
bin/kafka-server-start.sh config/server.properties
Step 3: 建立 topic
建立一個叫作「test」的topic,它只有一個分區,一個副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test [2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket) Created topic "test".
能夠經過list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 > test
除了手動建立topic,還能夠配置broker讓它自動建立topic.
Step 4:發送消息.
Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息併發送到服務端。默認的每條命令將發送一條消息。
運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a messageThis is another message
ctrl+c能夠退出發送。
默認狀況下,日誌數據會被放置到/tmp/kafka-logs中,每一個分區一個目錄
Step 5: 啓動consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一個命令行consumer能夠讀取消息並輸出到標準輸出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message This is another message
你在一個終端中運行consumer命令行,另外一個終端中運行producer命令行,就能夠在一個終端輸入消息,另外一個終端讀取消息。
這兩個命令都有本身的可選參數,能夠在運行的時候不加任何參數能夠看到幫助信息。
注意,必須先搭建zookeeper集羣
一、使用3臺機器搭建Kafka集羣:
192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test
二、在安裝Kafka集羣以前,這裏沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集羣,也是使用這3臺機器,保證Zookeeper集羣正常運行。
三、首先,在gdc-dn01-test上準備Kafka安裝文件,執行以下命令:
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz tar xvzf kafka_2.10-0.8.2.1.tgz mv kafka_2.10-0.8.2.1 kafka
四、修改配置文件kafka/config/server.properties,修改以下內容:
broker.id=0 zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
這裏須要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,若是 你有其餘的應用也在使用ZooKeeper集羣,查看ZooKeeper中數據可能會不直觀,因此強烈建議指定一個chroot路徑,直接在 zookeeper.connect配置項中指定:
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
並且,須要手動在ZooKeeper中建立路徑/kafka,使用以下命令鏈接到任意一臺ZooKeeper服務器:
cd ~/zookeeper
bin/zkCli.sh
在ZooKeeper執行以下命令建立chroot路徑:
create /kafka ''
這樣,每次鏈接Kafka集羣的時候(使用--zookeeper選項),也必須使用帶chroot路徑的鏈接字符串,後面會看到。
五、而後,將配置好的安裝文件同步到其餘的dn0二、dn03節點上:
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop
六、最後,在dn0二、dn03節點上配置修改配置文件kafka/config/server.properties內容以下所示:
broker.id=1 # 在dn02修改 broker.id=2 # 在dn03修改
由於Kafka集羣須要保證各個Broker的id在整個集羣中必須惟一,須要調整這個配置項的值(若是在單機上,能夠經過創建多個Broker進程來模擬分佈式的Kafka集羣,也須要Broker的id惟一,還須要修改一些配置目錄的信息)。
七、在集羣中的dn0一、dn0二、dn03這三個節點上分別啓動Kafka,分別執行以下命令:
bin/kafka-server-start.sh config/server.properties &
能夠經過查看日誌,或者檢查進程狀態,保證Kafka集羣啓動成功。
八、建立一個名稱爲my-replicated-topic5的Topic,5個分區,而且複製因子爲3,執行以下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
九、查看建立的Topic,執行以下命令:
bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
結果信息以下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: my-replicated-topic5 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: my-replicated-topic5 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: my-replicated-topic5 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: my-replicated-topic5 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 上面Leader、Replicas、Isr的含義以下:
1 Partition: 分區
2 Leader : 負責讀寫指定分區的節點
3 Replicas : 複製該分區log的節點列表
4 Isr : "in-sync" replicas,當前活躍的副本列表(是一個子集),而且可能成爲Leader
咱們能夠經過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示若是發佈消息、消費消息。
十一、在一個終端,啓動Producer,並向咱們上面建立的名稱爲my-replicated-topic5的Topic中生產消息,執行以下腳本:
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5
十二、在另外一個終端,啓動Consumer,並訂閱咱們上面建立的名稱爲my-replicated-topic5的Topic中生產的消息,執行以下腳本:
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
能夠在Producer終端上輸入字符串消息行,就能夠在Consumer終端上看到消費者消費的消息內容。
也能夠參考Kafka的Producer和Consumer的Java API,經過API編碼的方式來實現消息生產和消費的處理邏輯。
一、啓動集羣
bin/kafka-server-start.sh config/server.properties &
二、中止集羣
bin/kafka-server-stop.sh
三、重啓
沒有專用腳本,先停後啓便可
注:固然也可使用kill命令來關閉,但使用腳本有如下好處:
(1)It will sync all its logs to disk to avoid needing to do any log recovery when it restarts (i.e. validating the checksum for all messages in the tail of the log). Log recovery takes time so this speeds up intentional restarts.
(2)It will migrate any partitions the server is the leader for to other replicas prior to shutting down. This will make the leadership transfer faster and minimize the time each partition is unavailable to a few milliseconds.
一、建立topic
bin/kafka-topics.sh --create --zookeeper 192.168.172.98:2181/kafka --replication-factor 2 --partitions 3 --topic test_topic
(1)zookeeper指定其中一個節點便可,集羣之間會自動同步。
(2)--replication-factor 2 --partitions 3理論上應該是可選參數,但此腳本必須寫這2個參數。
(3)還可使用--config <name=value>來指定topic的某個具體參數,以代替配置文件中的參數。如:
bin/kafka-topics.sh --create --zookeeper 192.168.172.98:2181/kafka --replication-factor 2 --partitions 3 --topic test_topic retention.bytes=3298534883328
指定了某個topic最大的保留日誌量,單位是字節。
二、查看所有topic
bin/kafka-topics.sh --list --zookeeper 192.168.172.98:2181/kafka
三、查看某個topic的詳細信息
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,4 Isr: 3,4 Topic: test_topic Partition: 1 Leader: 4 Replicas: 4,5 Isr: 4,5 Topic: test_topic Partition: 2 Leader: 5 Replicas: 5,2 Isr: 5,2
(1)第一行列出了這個topic的整體狀況,如topic名稱,分區數量,副本數量等。
(2)第二行開始,每一行列出了一個分區的信息,如它是第幾個分區,這個分區的leader是哪一個broker,副本位於哪些broker,有哪些副本處理同步狀態。
四、啓動一個console producer,用於在console中模擬輸入消息
bin/kafka-console-producer.sh --broker-list 192.168.172.111:9092 --topic test_topic
五、啓動一個console consumer,用於模擬接收消息,並在console中輸出
bin/kafka-console-consumer.sh --zookeeper 192.168.172.111:2181/kafka --topic test_topic
此腳本能夠用於驗證一個topic的數據狀況,看消息是否正常流入等。
六、刪除一個topic
bin/kafka-topics.sh --delete --zookeeper 192.168.172.98:2181/kafka --topic test_topic
(1)配置文件中必須delete.topic.enable=true,不然只會標記爲刪除,而不是真正刪除。
(2)執行此腳本的時候,topic的數據會同時被刪除。若是因爲某些緣由致使topic的數據不能徹底刪除(如其中一個broker down了),此時topic只會被marked for deletion,而不會真正刪除。此時建立同名的topic會有衝突。
七、修改topic
使用—-alert原則上能夠修改任何配置,如下列出了一些經常使用的修改選項:
(1)改變分區數量
bin/kafka-topics.sh --alter --zookeeper 192.168.172.98:2181/kafka --topic test_topic --partitions 4
(2)增長、修改或者刪除一個配置參數
bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --config key=value bin/kafka-topics.sh —alter --zookeeper 192.168.172.98:2181/kafka --topic my_topic_name --deleteConfig key
【結論】若是一個broker掛掉,且能夠重啓則處理步驟以下:
(1)重啓kafka進程
(2)執行rebalance(因爲已經設置配置項自動執行balance,所以此步驟通常可忽略)
詳細分析見下面操做過程。
一、topic的狀況
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 5,2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
集羣中有4臺機器,id爲【2-5】,topic 有3個分區,每一個分區2個副本,leader分別位於2,3,5中。
二、模擬機器down,kill掉進程
分區0的leader位於id=5的broker中,kill掉這臺機器的kafka進程
kill -9 ****
三、再次查看topic的狀況
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
能夠看到,分區0的leader已經移到id=2的機器上了,它的副本位於2,5這2臺機器上,但處於同步狀態的只有id=2這臺機器。
四、重啓kafka進程
bin/kafka-server-start.sh config/server.properties &
五、再次查看狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
發現分區0的2個副本都已經處於同步狀態,但leader依然爲id=2的broker。
六、執行leader平衡
詳見leader的平衡部分。
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
若是配置文件中
auto.leader.rebalance.enable=true
則此步驟不須要執行。
七、從新查看topic
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
此時leader已經回到了id=5的broker,一切恢復正常。
【結論】當一個broker掛掉,須要換機器時,採用如下步驟:
一、將新機器kafka配置文件中的broker.id設置爲與原機器同樣
二、啓動kafka,注意kafka保存數據的目錄不會自動建立,須要手工建立
詳細分析過程以下:
一、初始化機器,主要包括用戶建立,kafka文件的複製等。
二、修改config/server.properties文件
注意,只須要修改一個配置broker.id,且此配置必須與掛掉的那臺機器相同,由於kafka是經過broker.id來區分集羣中的機器的。此處設爲
broker.id=5
三、查看topic的當前狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
當前topic有3個分區,其中分區1的leader位於id=5的機器上。
四、關掉id=5的機器
kill -9 ** 用於模擬機器忽然down
或者:
bin/kafka-server-stop.sh
用於正常關閉
五、查看topic的狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
可見,topic的分區0的leader已經遷移到了id=2的機器上,且處於同步的機器只有一個了。
六、啓動新機器
nohup bin/kafka-server-start.sh config/server.properties &
七、再看topic的狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
id=5的機器也處於同步狀態了,但還須要將leader恢復到這臺機器上。
八、執行leader平衡
詳見leader的平衡部分。
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
若是配置文件中
auto.leader.rebalance.enable=true
則此步驟不須要執行。
九、done
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
全部內容都恢復了
將一臺機器加入kafka集羣很容易,只須要爲它分配一個獨立的broker id,而後啓動它便可。可是這些新加入的機器上面並無任何的分區數據,因此除非將現有數據移動這些機器上,不然它不會作任何工做,直到建立新topic。所以,當你往集羣加入機器時,你應該將其它機器上的一部分數據往這臺機器遷移。
數據遷移的工做須要手工初始化,而後自動完成。它的原理以下:當新機器起來後,kafka將其它機器的一些分區複製到這個機器上,並做爲follower,當這個新機器完成複製併成爲in-sync狀態後,那些被複制的分區的一個副本會被刪除。(都不會成爲leader?)
一、將新機器kafka配置文件中的broker.id設置爲與原機器同樣
二、啓動kafka,注意kafka保存數據的目錄不會自動建立,須要手工建立
此時新建的topic都會優先分配leader到新增的機器上,但原有的topic不會將分區遷移過來。
三、數據遷移,請見數據遷移部分。
如下步驟用於將現有數據遷移到新的broker中,假設須要將test_topic與streaming_ma30_sdc的所有分區遷移到新的broker中(id 爲6和7)
一、建立一個json文件,用於指定哪些topic將被遷移過去
cat topics-to-move.json
{"topics": [ {"topic": "test_topic"}, {"topic": "streaming_ma30_sdc"} ], "version":1 }
注意全角,半角符號,或者中文引號之類的問題。
二、先generate遷移後的結果,檢查一下是否是你要想的效果
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --topics-to-move-json-file topics-to-move.json --broker-list "6,7" —generate Current partition replica assignment {"version":1,"partitions":[{"topic":"streaming_ma30_sdc","partition":2,"replicas":[2]},{"topic":"test_topic","partition":0,"replicas":[5,2]},{"topic":"test_topic","partition":2,"replicas":[3,4]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[5]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[4]},{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[3]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[4]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":2,"replicas":[7]},{"topic":"test_topic","partition":2,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[6]},{"topic":"test_topic","partition":1,"replicas":[6,7]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[6]}]}
分別列出了當前的狀態以及遷移後的狀態。
把這2個json分別保存下來,第一個用來萬一須要roll back的時候使用,第二個用來執行遷移。
三、執行遷移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --execute 其中expand-cluster-reassignment.json爲保存上面第二段json的文件。
四、查看遷移過程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [streaming_ma30_sdc,0] is still in progress Reassignment of partition [streaming_ma30_sdc,4] is still in progress Reassignment of partition [test_topic,2] completed successfully Reassignment of partition [test_topic,0] completed successfully Reassignment of partition [streaming_ma30_sdc,3] is still in progress Reassignment of partition [streaming_ma30_sdc,1] is still in progress Reassignment of partition [test_topic,1] completed successfully Reassignment of partition [streaming_ma30_sdc,2] is still in progress
五、當全部遷移的完成後,查看一下結果是否是你想要的
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 7 Replicas: 7,6 Isr: 6,7 Topic: test_topic Partition: 1 Leader: 6 Replicas: 6,7 Isr: 6,7 Topic: test_topic Partition: 2 Leader: 7 Replicas: 7,6 Isr: 6,7
完成
以上步驟將整個topic遷移,也能夠只遷移其中一個或者多個分區。
如下將test_topic的分區1移到broker id爲2,3的機器,分區2移到broker id爲4,5的機器.
【其實仍是整個topic遷移好一點,否則準備遷移文件會很麻煩】
一、準備遷移配置文件
cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"test_topic","partition":2,"replicas":[4,5]}]}
三、執行遷移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --execute
四、查看遷移過程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --verify
五、查看遷移結果
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic
當一個機器下線時,kafka並不會自動將這臺機器上的副本遷移到其它機器上,所以,咱們須要手工進行遷移。這個過程會至關的無聊,kafka打算在0.8.2版本中添加此特性。
有了嗎?再找找。若是隻是替換機器則不會有這個問題。
Increasing replication factor
Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the --execute option to increase the replication factor of the specified partitions.
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
The first step is to hand craft the custom reassignment plan in a json file-
cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
Then, use the json file with the --execute option to start the reassignment process-
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
You can also verify the increase in replication factor with the kafka-topics tool-
bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
當一個broker down掉時,全部原本將它做爲leader的分區會被將leader轉移到其它broker。這意味着當這個broker重啓時,它將再也不擔任何分區的leader,kafka的client也不會從這個broker來讀取消息,致使資源的浪費。
爲了不這種狀況的發生,kafka增長了一個標記:優先副本(preferred replicas)。若是一個分區有3個副本,且這3個副本的優先級別分別爲1,5,9,則1會做爲leader。爲了使kafka集羣恢復默認的leader,須要運行如下命令:
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.172.98:2181/kafka
或者能夠設置如下配置項,leader 會自動執行balance:
auto.leader.rebalance.enable=true
這配置默認即爲空,但須要通過一段時間後纔會觸發,約半小時。