Kafka學習文檔

 

本教程假定您是一隻小白,沒有Kafka 或ZooKeeper 方面的經驗。 Kafka腳本在Unix和Windows平臺有所不一樣,在Windows平臺,請使用 bin\windows\ 而不是bin/, 並將腳本擴展名改成.bat。html

 

 

1.   Kafka概述

 

 

 

1.1.      消息隊列

 

(1)點對點模式(一對一,消費者主動拉取數據,消息收到後消息清除)java

點對點模型一般是一個基於拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特色是發送到隊列的消息被一個且只有一個接收者接收處理,即便有多個消息監聽者也是如此。react

(2)發佈/訂閱模式(一對多,數據生產後,推送給全部訂閱者)git

發佈訂閱模型則是一個基於推送的消息傳送模型。發佈訂閱模型能夠有多種不一樣的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的全部消息,即便當前訂閱者不可用,處於離線狀態。github

 

 

1.2.      爲何須要消息隊列

1)解耦:web

  容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。算法

2)冗餘:docker

消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。shell

3)擴展性:apache

由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。

4)靈活性 & 峯值處理能力:

在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。

5)可恢復性:

系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。

6)順序保證:

在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)

7)緩衝:

有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。

8)異步通訊:

不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。

 

1.3.      什麼是Kafka

在流式計算中,Kafka通常用來緩存數據,Storm經過消費Kafka的數據進行計算。

1)Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。

2)Kafka最初是由LinkedIn公司開發,並於2011年初開源。2012年10月從Apache Incubator畢業。該項目的目標是爲處理實時數據提供一個統1、高通量、低等待的平臺。

3)Kafka是一個分佈式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)稱爲broker。

4)不管是kafka集羣,仍是consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。

 

Kafka架構圖

                     

 

 

 

 

Kafka詳細架構圖

 

1)Producer :消息生產者,就是向kafka broker發消息的客戶端;

2)Consumer :消息消費者,向kafka broker取消息的客戶端;

3)Topic :能夠理解爲一個隊列;

4) Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)和單播(發給任意一個consumer)的手段。一個topic能夠有多個CG。topic的消息會複製(不是真的複製,是概念上的)到全部的CG,但每一個partion只會把消息發給該CG中的一個consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。要實現單播只要全部的consumer在同一個CG。用CG還能夠將consumer進行自由的分組而不須要屢次發送消息到不一樣的topic;

5)Broker :一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic;

6)Partition:爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的總體(多個partition間)的順序;

7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset作名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件便可。固然the first offset就是00000000000.kafka。

 

 

2.   Kafka單節點運行方式

 

Setp 1:下載代碼

下載 kafka_2.12-2.1.0 版本而且解壓。

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.12-2.1.0.tgz

> tar -xzf kafka_2.11-1.0.0.tgz

> cd kafka_2.11-1.0.0

 

Setp 2:啓動服務

Kafka 使用 ZooKeeper 若是你尚未ZooKeeper服務器,你須要先啓動一個ZooKeeper服務器。 您能夠經過與kafka打包在一塊兒的便捷腳原本快速簡單地建立一個單節點ZooKeeper實例。若是你有使用docker的經驗,你能夠使用docker-compose快速搭建一個zk集羣。

> bin/zookeeper-server-start.sh config/zookeeper.properties

如今啓動Kafka服務器:

> bin/kafka-server-start.sh config/server.properties

後臺啓動:

> bin/kafka-server-start.sh config/server.properties 1>/dev/null  2>&1  &

其中1>/dev/null  2>&1 是將命令產生的輸入和錯誤都輸入到空設備,也就是不輸出的意思。

/dev/null表明空設備。

 

Setp 3:建立一個topic

建立一個名爲「test」的topic,它有一個分區和一個副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

運行list(列表)命令來查看這個topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181 test

除了手工建立topic外,你也能夠配置你的broker,當發佈一個不存在的topic時自動建立topic。

 

Setp 4:發送消息

Kafka自帶一個命令行客戶端,它從文件或標準輸入中獲取輸入,並將其做爲message(消息)發送到Kafka集羣。默認狀況下,每行將做爲單獨的message發送。

運行 producer,而後在控制檯輸入一些消息以發送到服務器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

hello world

Hello study.163.com

 

Setp 5:啓動消費者

Kafka還有一個命令行使用者,它會將消息轉儲到標準輸出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

hello world

hello study.163.com

若是在不一樣的終端中運行上述命令,可以在生產者終端中鍵入消息並看到它們出如今消費者終端中。

全部命令行工具都有選項; 運行不帶參數的命令將顯示使用信息。

 

 

3.   Kafka集羣部署方式

 

Setp 6:設置多 broker 集羣

到目前,咱們只是單一的運行一個broker,對於Kafka,一個broker僅僅只是一個集羣的大小,接下來咱們來設多個broker。

首先爲每一個broker建立一個配置文件:

> cp config/server.properties config/server-1.properties

> cp config/server.properties config/server-2.properties

如今編輯這些新建的文件,設置如下屬性:

config/server-1.properties:

    broker.id=1

    listeners=PLAINTEXT://:9093

    log.dir=/tmp/kafka-logs-1

 

config/server-2.properties:

    broker.id=2

    listeners=PLAINTEXT://:9094

    log.dir=/tmp/kafka-logs-2

broker.id屬性是集羣中每一個節點的名稱,這一名稱是惟一且永久的。

咱們已經創建Zookeeper和一個單節點了,如今咱們只須要啓動兩個新的節點:

> bin/kafka-server-start.sh config/server-1.properties &

...

> bin/kafka-server-start.sh config/server-2.properties &

...

如今建立一個副本爲3的新topic:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

運行命令「describe topics」 查看集羣中的topic信息

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

如下是對輸出信息的解釋:第一行給出了全部分區的摘要,下面的每行都給出了一個分區的信息。由於咱們只有一個分區,因此只有一行。

l  「leader」是負責給定分區全部讀寫操做的節點。每一個節點都是隨機選擇的部分分區的領導者。

l  「replicas」是複製分區日誌的節點列表,無論這些節點是leader仍是僅僅活着。

l  「isr」是一組「同步」replicas,是replicas列表的子集,它活着並被指到leader。

請注意,在示例中,節點1是該主題中惟一分區的領導者。

 

咱們運行這個命令,看看一開始咱們建立的那個test節點:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:

    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

這並不奇怪,剛纔建立的主題沒有Replicas,而且在服務器「0」上,咱們建立它的時候,集羣中只有一個服務器,因此是「0」。

發佈一些信息在新的topic上:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2

消費這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

測試集羣的容錯,kill掉leader,Broker1做爲當前的leader,也就是kill掉Broker1。

> ps aux | grep server-1.properties

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

> kill -9 7564

備份節點之一成爲新的leader,而broker1已經不在同步備份集合裏了。

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

即便最初接受寫入的領導者已經失敗,這些消息仍可供消費:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

 

Setp 7:使用 Kafka Connect 導入/導出數據

Kafka Connect是Kafka的一個工具,它能夠將數據導入和導出到Kafka。它是一種可擴展工具,經過運行connectors(鏈接器), 使用自定義邏輯來實現與外部系統的交互。接下來咱們將學習如何使用簡單的connectors來運行Kafka Connect,這些connectors 將文件中的數據導入到Kafka topic中,並從中導出數據到一個文件。

首先,咱們將建立一些種子數據來進行測試:

> echo -e "allen" > test.txt

> echo -e "tony" >> test.txt

接下來,咱們將啓動兩個standalone(獨立)運行的鏈接器,第一個是源鏈接器,它從輸入文件讀取行並生成Kafka主題,第二個是宿鏈接器從Kafka主題讀取消息並將每一個消息生成爲輸出文件中的一行。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

一旦Kafka Connect進程啓動,源鏈接器應該開始從test.txt主題讀取行並生成它們connect-test,而且接收器鏈接器應該開始從主題讀取消息connect-test 並將它們寫入文件test.sink.txt。咱們能夠經過檢查輸出文件的內容來驗證數據是否已經過整個管道傳遞:

> more test.sink.txt

allen

tony

數據存儲在Kafka主題中connect-test,所以咱們還能夠運行控制檯使用者來查看主題中的數據:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

{"schema":{"type":"string","optional":false},"payload":"allen"}

{"schema":{"type":"string","optional":false},"payload":"tony"}

...

鏈接器一直在處理數據,因此咱們能夠將數據添加到文件中,並看到它在pipeline 中移動:

> echo mike >> test.txt

 

 

4.   Kafka Stream

Kafka Streams是一個客戶端庫,用於構建任務關鍵型實時應用程序和微服務,其中輸入和/或輸出數據存儲在Kafka集羣中。Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性以及Kafka服務器端集羣技術的優點,使這些應用程序具備高度可擴展性,彈性,容錯性,分佈式等等。

如下是WordCountDemo示例代碼的要點(爲了方便閱讀,使用的是java8 lambda表達式)。

步驟:

1.啓動zk和kafka

> bin/zookeeper-server-start.sh config/zookeeper.properties

> bin/kafka-server-start.sh config/server.properties

 

  1. 2.    準備輸入主題並啓動生產者

建立名爲streams-plaintext-input的輸入主題和名爲streams-wordcount-output的輸出主題:

> bin/kafka-topics.sh --create \

    --zookeeper localhost:2181 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-plaintext-input

Created topic "streams-plaintext-input".

注意:咱們建立輸出主題並啓用壓縮,由於輸出流是更改日誌流

> bin/kafka-topics.sh --create \

    --zookeeper localhost:2181 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-wordcount-output \

    --config cleanup.policy=compact

Created topic "streams-wordcount-output".

使用相同的kafka-topics工具描述建立的主題:

> bin/kafka-topics.sh --zookeeper localhost:2181 --describe

 

  1. 3.    啓動Wordcount應用程序

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示應用程序將從輸入主題stream-plaintext-input讀取,對每一個讀取消息執行WordCount算法的計算,並將其當前結果連續寫入輸出主題streams-wordcount-output

 

  1. 處理數據

開啓一個生產者終端:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic

 streams-plaintext-input

all streams lead to kafka

開啓一個消費者終端:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \

    --topic streams-wordcount-output \

    --from-beginning \

    --formatter kafka.tools.DefaultMessageFormatter \

    --property print.key=true \

    --property print.value=true \

    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \

--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1

streams 1

lead    1

to      1

kafka   1

這裏,第一列是java.lang.String格式的Kafka消息鍵,表示正在計數的單詞,第二列是java.lang.Long格式的消息值,表示單詞的最新計數。

 

 

5.   Kafka分片存儲機制

n  複習幾個kafka重要概念:

 

ü  Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker能夠組成一個Kafka集羣。

ü  Topic:一類消息,例如page view日誌、click日誌等均可以以topic的形式存在,Kafka集羣可以同時負責多個topic的分發。

ü  Partition:topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。

ü  Segment:partition物理上由多個segment組成,下面有詳細說明。

ü  offset:每一個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每一個消息都有一個連續的序列號叫作offset,用於partition中惟一標識的這條消息。

 

n  topic中partition存儲分佈

 

下面示意圖形象說明了partition中文件存儲方式:

 

 

ü  每一個partion(目錄)至關於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每一個段segment file消息數量不必定相等,這種特性方便old segment file快速被刪除。(默認狀況下每一個文件大小爲1G)

 

ü  每一個partiton只須要支持順序讀寫就好了,segment文件生命週期由服務端配置參數決定。

 

這樣作的好處就是能快速刪除無用文件,有效提升磁盤利用率。

 

 

6.   Kafka消息分發和消費者push、pull機制

 

6.1.      消息分發

ü  Producer客戶端負責消息的分發

l  kafka集羣中的任何一個broker均可以向producer提供metadata信息,這些metadata中包含」集羣中存活的servers列表」/」partitions leader列表」等信息;

l  當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接;

l  消息由producer直接經過socket發送到broker,中間不會通過任何」路由層」,事實上,消息被路由到哪一個partition上由producer客戶端決定;好比能夠採用」random」「key-hash」「輪詢」等,若是一個topic中有多個partitions,那麼在producer端實現」消息均衡分發」是必要的。

l  在producer端的配置文件中,開發者能夠指定partition路由的方式。

 

ü  Producer消息發送的應答機制

設置發送數據是否須要服務端的反饋,有三個值0,1,-1

l  0: producer不會等待broker發送ack

l  1: 當leader接收到消息以後發送ack

l  -1: 當全部的follower都同步消息成功後發送ack

request.required.acks=0

 

6.2.      消費者push、pull機制

做爲一個message system,kafka遵循了傳統的方式,選擇由kafka的producer向broker push信息,而consumer從broker pull信息。

 

consumer獲取消息,能夠使用兩種方式:push或pull模式。下面咱們簡單介紹一下這兩種區別:

 

push模式

 

常見的push模式如storm的消息處理,由spout負責消息的推送。該模式下須要一箇中心節點,負責消息的分配狀況(哪段消息分配給consumer1,哪段消息分配給consumer2),同時還要監聽consumer的ack消息用於判斷消息是否處理成功,若是在timeout時間內爲收到響應能夠認爲該consumer掛掉,須要從新分配sonsumer上失敗的消息。這種模式有個問題,不太容易實現咱們想要的消息回放功能,由於理想狀況下由consumer決定我到底要消費什麼,而這種模式徹底由master決定。

 

pull模式

 

如上圖模式,該模式爲pull模式,由consumer決定消息的消費狀況,這種模式有一個好處是咱們不須要返回ack消息,由於當consumer申請消費下一批消息時就能夠認爲上一批消息已經處理完畢,也不須要處理超時的問題,consumer能夠根據本身的消費能力來消費消息。但這個還有一個問題,如何保證處理的消息的不會重複呢,kafka具體作法就是增長隊列的併發度(partition),能夠一個partition對準一個consumer。

 

綜上,kafka的consumer之因此沒有采用push模式,是由於push模式很難適應消費者速率不一樣的消費者並且很難實現消息的回放功能,由於消息發送速率是由broker決定的。push模式的目標就是儘量以最快速度傳遞消息,可是這樣很容易形成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞,而pull模式則能夠根據consumer的消費能力以適當的速率消費message。

 

pull與push的區別

 

pull技術:

 

客戶機向服務器請求信息;

kafka中,consuemr根據本身的消費能力以適當的速率消費信息;

 

push技術:

 

服務器主動將信息發往客戶端的技術;

push模式的目標就是儘量以最快的速率傳遞消息。

 

 

7.   Kafka持久化

7.1.      概述

不要畏懼文件系統!

 

Kafka大量依賴文件系統去存儲和緩存消息。對於硬盤有個傳統的觀念是硬盤老是很慢,這使不少人懷疑基於文件系統的架構可否提供優異的性能。實際上硬盤的快慢徹底取決於使用它的方式。設計良好的硬盤架構能夠和內存同樣快。

 

在6塊7200轉的SATA RAID-5磁盤陣列的線性寫速度差很少是600MB/s,可是隨即寫的速度倒是100k/s,差了差很少6000倍。如今的操做系統提供了預讀取和後寫入的技術。實際上發現線性的訪問磁盤,不少時候比隨機的內存訪問快得多。

 

爲了提升性能,現代操做系統每每使用內存做爲磁盤的緩存,現代操做系統樂於把全部空閒內存用做磁盤緩存,雖然這可能在緩存回收和從新分配時犧牲一些性能。全部的磁盤讀寫操做都會通過這個緩存,這不太可能被繞開除非直接使用I/O。因此雖然每一個程序都在本身的線程裏只緩存了一份數據,但在操做系統的緩存裏還有一份,這等於存了兩份數據。

 

u  基於jvm內存有如下缺點:

v  Java對象佔用空間是很是大的,差很少是要存儲的數據的兩倍甚至更高。

v  隨着堆中數據量的增長,垃圾回收回變的愈來愈困難,並且可能致使錯誤

 

基於以上分析,若是把數據緩存在內存裏,由於須要存儲兩份,不得不使用兩倍的內存空間,Kafka基於JVM,又不得不將空間再次加倍,再加上要避免GC帶來的性能影響,在一個32G內存的機器上,不得不使用到28-30G的內存空間。而且當系統重啓的時候,又必需要將數據刷到內存中( 10GB 內存差很少要用10分鐘),就算使用冷刷新(不是一次性刷進內存,而是在使用數據的時候沒有就刷到內存)也會致使最初的時候新能很是慢。

 

u  基於操做系統的文件系統來設計有如下好處:

v  能夠經過os的pagecache來有效利用主內存空間,因爲數據緊湊,能夠cache大量數據,而且沒有gc的壓力

v  即便服務重啓,緩存中的數據也是熱的(不須要預熱)。而基於進程的緩存,須要程序進行預熱,並且會消耗很長的時間。(10G大概須要10分鐘)

v  大大簡化了代碼。由於在緩存和文件系統之間保持一致性的全部邏輯都在OS中。以上建議和設計使得代碼實現起來十分簡單,不須要盡力想辦法去維護內存中的數據,數據會當即寫入磁盤。

 

總的來講,Kafka不會保持儘量多的內容在內存空間,而是儘量把內容直接寫入到磁盤。全部的數據都及時的以持久化日誌的方式寫入到文件系統,而沒必要要把內存中的內容刷新到磁盤中。

 

7.2.      日誌數據持久化特性

寫操做:經過將數據追加到文件中實現

 

讀操做:讀的時候從文件中讀就行了

 

7.3.      優點

ü  讀操做不會阻塞寫操做和其餘操做(由於讀和寫都是追加的形式,都是順序的,不會亂,因此不會發生阻塞),數據大小不對性能產生影響;

ü  沒有容量限制(相對於內存來講)的硬盤空間創建消息系統;

ü  線性訪問磁盤,速度快,能夠保存任意一段時間!

 

7.4.      持久化原理

Topic在邏輯上能夠被認爲是一個queue。每條消費都必須指定它的topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得Kafka的吞吐率能夠水平擴展,物理上把topic分紅一個或多個partition,每一個partition在物理上對應一個文件夾,該文件夾下存儲這個partition的全部消息和索引文件

 

 

每一個日誌文件都是「log entries」序列,每個log entry包含一個4字節整型數(值爲N),其後跟N個字節的消息體。每條消息都有一個當前partition下惟一的64字節的offset,它指明瞭這條消息的起始位置。磁盤上存儲的消息格式以下:

消息長度: 4 bytes (value: 1 + 4 + n)

版本號: 1 byte

CRC校驗碼: 4 bytes

具體的消息: n bytes

 

這個「log entries」並不是由一個文件構成,而是分紅多個segment,每一個segment名爲該segment第一條消息的offset和「.kafka」組成。另外會有一個索引文件,它標明瞭每一個segment下包含的log entry的offset範圍,以下圖所示:

 

 

 

 

 

7.5.      索引

稀疏存儲,每隔必定字節的數據創建一條索引(這樣的目的是爲了減小索引文件的大小)。

 

下圖爲一個partition的索引示意圖:

 

 

  1. 如今對6.和8創建了索引,若是要查找7,則會先查找到8而後,再找到8後的一個索引6,而後兩個索引之間作二分法,找到7的位置

 

  1. 每個log文件中又分爲多個segment

 

 

經過調用kafka自帶的工具,能夠看到日誌下的數據信息

> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /root/kafka/kafka-logs/streams-plaintext-input-0/00000000000000000000.log --print-data-log --verify-index-only

 

 

 

kafka日誌分爲index與log,兩個成對出現;index文件存儲元數據(用來描述數據的數據,這也多是爲何index文件這麼大的緣由了),log存儲消息。索引文件元數據指向對應log文件中message的遷移地址;例如2,128指log文件的第2條數據,偏移地址爲128;而物理地址(在index文件中指定)+ 偏移地址能夠定位到消息。

由於每條消息都被append到該partition中,是順序寫磁盤,所以效率很是高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。

 

 

 

 

8.   Kafa API實戰

 

 

 

 

9.   Kafka producer攔截器(interceptor)

9.1.      攔截器原理

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用於實現clients端的定製化控制邏輯。

對於producer而言,interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息作一些定製化需求,好比修改消息等。同時,producer容許用戶指定多個interceptor按序做用於同一條消息從而造成一個攔截鏈(interceptor chain)。Intercetpor的實現接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

(1)configure(configs)

獲取配置信息和初始化數據時調用。

(2)onSend(ProducerRecord):

該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區前調用該方法。用戶能夠在該方法中對消息作任何操做,但最好保證不要修改消息所屬的topic和分區,不然會影響目標分區的計算

(3)onAcknowledgement(RecordMetadata, Exception):

該方法會在消息被應答或消息發送失敗時調用,而且一般都是在producer回調邏輯觸發以前。onAcknowledgement運行在producer的IO線程中,所以不要在該方法中放入很重的邏輯,不然會拖慢producer的消息發送效率

(4)close:

關閉interceptor,主要用於執行一些資源清理工做

如前所述,interceptor可能被運行在多個線程中,所以在具體實現時用戶須要自行確保線程安全。另外假若指定了多個interceptor,則producer將按照指定順序調用它們,並僅僅是捕獲每一個interceptor可能拋出的異常記錄到錯誤日誌中而非在向上傳遞。這在使用過程當中要特別留意。

 

 

9.2.      攔截器案例

1)需求:

實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發送後更新成功發送消息數或失敗發送消息數。

 

 

2)案例實操

(1)增長時間戳攔截器

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

 

public class TimeInterceptor implements ProducerInterceptor<String, String> {

 

           @Override

           public void configure(Map<String, ?> configs) {

 

           }

 

           @Override

           public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

                     // 建立一個新的record,把時間戳寫入消息體的最前部

                     return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),

                                          System.currentTimeMillis() + "," + record.value().toString());

           }

 

           @Override

           public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

 

           }

 

           @Override

           public void close() {

 

           }

}

(2)統計發送消息成功和發送失敗消息數,並在producer關閉時打印這兩個計數器

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

 

public class CounterInterceptor implements ProducerInterceptor<String, String>{

    private int errorCounter = 0;

    private int successCounter = 0;

 

           @Override

           public void configure(Map<String, ?> configs) {

                    

           }

 

           @Override

           public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

                      return record;

           }

 

           @Override

           public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

                     // 統計成功和失敗的次數

        if (exception == null) {

            successCounter++;

        } else {

            errorCounter++;

        }

           }

 

           @Override

           public void close() {

        // 保存結果

        System.out.println("Successful sent: " + successCounter);

        System.out.println("Failed sent: " + errorCounter);

           }

}

(3)producer主程序

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

 

public class InterceptorProducer {

 

           public static void main(String[] args) throws Exception {

                     // 1 設置配置信息

                     Properties props = new Properties();

                     props.put("bootstrap.servers", "localhost:9092");

                     props.put("acks", "all");

                     props.put("retries", 0);

                     props.put("batch.size", 16384);

                     props.put("linger.ms", 1);

                     props.put("buffer.memory", 33554432);

                     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

                     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

                    

                     // 2 構建攔截鏈

                     List<String> interceptors = new ArrayList<>();

                     interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");            interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor");

                     props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

                      

                     String topic = "first";

                     Producer<String, String> producer = new KafkaProducer<>(props);

                    

                     // 3 發送消息

                     for (int i = 0; i < 10; i++) {

                               

                         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);

                         producer.send(record);

                     }

                      

                     // 4 必定要關閉producer,這樣纔會調用interceptor的close方法

                     producer.close();

           }

}

3)測試

(1)在kafka上啓動消費者,而後運行客戶端java程序。

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic first

 

1501904047034,message0

1501904047225,message1

1501904047230,message2

1501904047234,message3

1501904047236,message4

1501904047240,message5

1501904047243,message6

1501904047246,message7

1501904047249,message8

1501904047252,message9

(2)觀察java平臺控制檯輸出數據以下:

Successful sent: 10

Failed sent: 0

 

 

 

 

 

10.        Kafa擴容

 

擴容:增長機器,例如原來三臺服務器的kafka集羣增長兩臺機器成爲有五臺機器的kafka集羣,跟搭建差很少

 

分區從新分配:在原來機器上的主題分區不會自動均衡到新的機器,須要使用分區從新分配工具來均衡均衡

 

從新分配官方文檔地址:http://kafka.apache.org/documentation/#basic_ops_cluster_expansion

 

將服務器添加到Kafka集羣很簡單,只需爲它們分配一個惟一的代理ID,並在新服務器上啓動Kafka。可是,這些新服務器不會自動分配任何數據分區,所以除非將分區移動到它們,不然在建立新主題以前它們不會執行任何工做。所以,一般在將計算機添加到羣集時,您須要將一些現有數據遷移到這些計算機。

遷移數據的過程是手動啓動的,但徹底自動化。在幕後,Kafka將添加新服務器做爲其正在遷移的分區的跟隨者,並容許它徹底複製該分區中的現有數據。當新服務器徹底複製此分區的內容並加入同步副本時,其中一個現有副本將刪除其分區的數據。

 

分區從新分配工具可用於在代理之間移動分區。理想的分區分佈將確保全部代理的均勻數據負載和分區大小。分區從新分配工具沒法自動研究Kafka羣集中的數據分佈並移動分區以實現均勻的負載分配。所以,管理員必須弄清楚應該移動哪些主題或分區。

 

10.1.  自動將數據遷移到新計算機

分區從新分配工具可用於將一些主題從當前的代理集移動到新添加的代理。這在擴展示有集羣時一般頗有用,由於將整個主題移動到新的代理集更容易,而不是一次移動一個分區。當用於執行此操做時,用戶應提供應移至新的代理集的主題列表和新代理的目標列表。而後,該工具在新的代理集中均勻分配給定主題列表的全部分區。在此移動期間,主題的複製因子保持不變。有效地,輸入主題列表的全部分區的副本將從舊的代理集移動到新添加的代理。

例如,如下示例將主題foo1,foo2的全部分區移動到新的代理集5,6。在此移動結束時,主題foo1和foo2的全部分區將僅存在於代理5,6上。

 

因爲該工具接受主題的輸入列表做爲json文件,所以首先須要肯定要移動的主題並建立json文件,以下所示:

{"topics": [{"topic": "foo1"}],

"version":1

}

 

一旦json文件準備就緒,使用分區從新分配工具生成候選分配:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4" --generate

Current partition replica assignment

 

{"version":1,

"partitions":[{"topic":"foo1","partition":1,"replicas":[1,2]},

              {"topic":"foo1","partition":0,"replicas":[1,2]}]

}

 

Proposed partition reassignment configuration

 

{"version":1,

"partitions":[{"topic":"foo1","partition":1,"replicas":[5,6]},

              {"topic":"foo1","partition":0,"replicas":[5,6]}]

}

 

該工具生成一個候選分配,將全部分區從主題foo1,foo2移動到代理5,6。但請注意,此時分區移動還沒有開始,它只是告訴您當前的分配和建議的新分配。應保存當前分配,以防您想要回滾它。新的賦值應保存在json文件中(例如expand-cluster-reassignment.json),以使用--execute選項輸入到工具,以下所示:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

 

最後,--verify選項可與該工具一塊兒使用,以檢查分區從新分配的狀態。請注意,相同的expand-cluster-reassignment.json(與--execute選項一塊兒使用)應與--verify選項一塊兒使用:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

Status of partition reassignment:

Reassignment of partition [foo1,0] completed successfully

Reassignment of partition [foo1,1] is in progress

Reassignment of partition [foo1,2] is in progress

Reassignment of partition [foo2,0] completed successfully

Reassignment of partition [foo2,1] completed successfully

Reassignment of partition [foo2,2] completed successfully

 

 

10.2.  減小遷移的數據量

若是要遷移的Topic 有大量數據(假如Topic 默認保留1天的數據),能夠在遷移以前臨時動態地調整retention.ms 來減小數據量,Kafka 會主動purge 掉1個小時以前的數據。

> bin/kafka-topics --zookeeper localhost:2181 --alter --topic sdk_counters --config retention.ms=3600000

 

在遷移完成後,恢復原先設置

> bin/kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms=86400000

 

10.3.  從新制定partition leader

有時候因爲節點down 了,partition 的leader 可能不是咱們但願的那個的,這時,能夠經過kafka-preferred-replica-election 工具將replica 中的第一個節點做爲該分區的leader。

手動編輯topicPartitionList.json 文件,指定要從新分配leader 的分區。

 

{"partitions":[{"topic":"sdk_counters","partition":5}]}

執行命令

> bin/kafka-preferred-replica-election --zookeeper 10.1.1.50:2181/kafka -path-to-json-file ~/kafka/topicPartitionList.json

 

10.4.  中斷遷移任務

從新指定partition leader一旦啓動reassign 腳本,則沒法中止遷移任務。若是須要強制中止,能夠經過zookeeper 進行修改。

> bin/zkCli.sh -server localhost:2181

> delete /admin/reassign_partitions

 

10.5.  自定義分區分配和遷移

分區從新分配工具還可用於選擇性地將分區的副本移動到特定的代理集。當以這種方式使用時,假設用戶知道從新分配計劃而且不須要工具生成候選從新​​分配,有效地跳過 - 生成步驟並直接移動到--execute步驟

例如,如下示例將主題foo1的分區0移動到代理5,6,將主題foo2的分區1移動到代理2,3:

 

第一步是在json文件中手工製做自定義從新分配計劃:

> cat custom-reassignment.json

{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

而後,使用帶有--execute選項的json文件來啓動從新分配過程:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute

Current partition replica assignment

 

{"version":1,

"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},

              {"topic":"foo2","partition":1,"replicas":[3,4]}]

}

 

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions

{"version":1,

"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},

              {"topic":"foo2","partition":1,"replicas":[2,3]}]

}

--verify選項可與該工具一塊兒使用,以檢查分區從新分配的狀態。請注意,相同的expand-cluster-reassignment.json(與--execute選項一塊兒使用)應與--verify選項一塊兒使用:

 

 

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify

Status of partition reassignment:

Reassignment of partition [foo1,0] completed successfully

Reassignment of partition [foo2,1] completed successfully

 

 

11.        優雅的關機

Kafka羣集將自動檢測任何代理關閉或故障,併爲該計算機上的分區選擇新的領導者。不管服務器發生故障仍是故意將其關閉以進行維護或配置更改,都會發生這種狀況。對於後一種狀況,Kafka支持更優雅的機制來中止服務器,而不只僅是殺死服務器。當服務器正常中止時,它有兩個優化:

  1. 它會將全部日誌同步到磁盤,以免在從新啓動時須要執行任何日誌恢復(即驗證日誌尾部全部消息的校驗和)。日誌恢復須要時間,所以加速了故意重啓。

 

  1. 在關閉以前,它會將服務器所領先的任何分區遷移到其餘副本。這將使領導層轉移更快,並將每一個分區不可用的時間縮短到幾毫秒。

 

每當服務器中止而不是硬殺死時,將自動同步日誌,可是leader遷移須要使用特殊的設置:

controlled.shutdown.enable=true

請注意,只有在代理上託管的全部分區都具備副本(即複製因子大於1 且這些副本中至少有一個處於活動狀態)時,受控關閉纔會成功。這一般是您想要的,由於關閉最後一個副本會使該主題分區不可用。

 

 

12.        Leader選舉機制

12.1.  Kafka的Leader是什麼

首先Kafka會將接收到的消息分區(partition),每一個主題(topic)的消息有不一樣的分區。這樣一方面消息的存儲就不會受到單一服務器存儲空間大小的限制,另外一方面消息的處理也能夠在多個服務器上並行。

 

其次爲了保證高可用,每一個分區都會有必定數量的副本(replica)。這樣若是有部分服務器不可用,副本所在的服務器就會接替上來,保證應用的持續性。

 

可是,爲了保證較高的處理效率,消息的讀寫都是在固定的一個副本上完成。這個副本就是所謂的Leader,而其餘副本則是Follower。而Follower則會按期地到Leader上同步數據。

12.2.  Leader選舉

  若是某個分區所在的服務器除了問題,不可用,kafka會從該分區的其餘的副本中選擇一個做爲新的Leader。以後全部的讀寫就會轉移到這個新的Leader上。如今的問題是應當選擇哪一個做爲新的Leader。顯然,只有那些跟Leader保持同步的Follower才應該被選做新的Leader。

  Kafka會在Zookeeper上針對每一個Topic維護一個稱爲ISR(in-sync replica,已同步的副本)的集合,該集合中是一些分區的副本。只有當這些副本都跟Leader中的副本同步了以後,kafka纔會認爲消息已提交,並反饋給消息的生產者。若是這個集合有增減,kafka會更新zookeeper上的記錄。

  若是某個分區的Leader不可用,Kafka就會從ISR集合中選擇一個副本做爲新的Leader。

  顯然經過ISR,kafka須要的冗餘度較低,能夠容忍的失敗數比較高。假設某個topic有f+1個副本,kafka能夠容忍f個服務器不可用。

 

12.3.  具體選舉過程

最簡單最直觀的方案是,leader在zk上建立一個臨時節點,全部Follower對此節點註冊監聽,當leader宕機時,此時ISR裏的全部Follower都嘗試建立該節點,而建立成功者(Zookeeper保證只有一個能建立成功)便是新的Leader,其它Replica即爲Follower。

實際上的實現思路也是這樣,只是優化了下,多了個代理控制管理類(controller)。引入的緣由是,當kafka集羣業務不少,partition達到成千上萬時,當broker宕機時,形成集羣內大量的調整,會形成大量Watch事件被觸發,Zookeeper負載會太重。zk是不適合大量寫操做的。

 

 

contoller,zk,其餘broker交互流程圖

n  Controller提供:

ü  增長刪除topic

ü  更新分區副本數量

ü  選舉分區leader

ü  集羣broker增長和宕機後的調整

ü  自身的選舉controller leader功能

這些功能都是controller經過監聽Zookeeper間接節點出發,而後controller再跟其餘的broker具體的去交互實現的(rpc的方式)。

 

n  controller的內部設計:

當前controller啓動時會爲集羣中全部broker建立一個各自的鏈接。假設你的集羣中有100臺broker,那麼controller啓動時會建立100個Socket鏈接(也包括與它本身的鏈接!)。具體的類NetworkClient類,底層就是Java NIO reactor模型)。Controller會爲每一個鏈接都建立一個對應的請求發送線程(RequestSendThread)。

controller實現如上功能,要先熟悉kafka下zk上的數據存儲結構:

ü  brokers列表:ls /brokers/ids

ü  某個broker信息:get /brokers/ids/0

ü  topic信息:get /brokers/topics/kafka10-topic-xxx

ü  partition信息:get /brokers/topics/kafka10-topic-xxx/partitions/0/state

ü  controller中心節點變動次數:get /controller_epoch

ü  conrtoller leader信息:get /controller

 

 

 

broker機器id

 

 

某個broker信息

 

 

topic信息

 

partition信息

 

conrtoller leader信息

 

12.4.  爲何不用少數服用多數的方法

少數服從多數是一種比較常見的一致性算法和Leader選舉法。它的含義是隻有超過半數的副本同步了,系統纔會認爲數據已同步;選擇Leader時也是從超過半數的同步的副本中選擇。這種算法須要較高的冗餘度。譬如只容許一臺機器失敗,須要有三個副本;而若是隻容忍兩臺機器失敗,則須要五個副本。而kafka的ISR集合方法,分別只須要兩個和三個副本。

若是全部的ISR副本都失敗了怎麼辦

  此時有兩種方法可選,一種是等待ISR集合中的副本復活,一種是選擇任何一個當即可用的副本,而這個副本不必定是在ISR集合中。這兩種方法各有利弊,實際生產中按需選擇。

  若是要等待ISR副本復活,雖然能夠保證一致性,但可能須要很長時間。而若是選擇當即可用的副本,則極可能該副本並不一致。

 

 

13.        監控

雖然目前Apache Kafka已經全面進化成一個流處理平臺,但大多數的用戶依然使用的是其核心功能:消息隊列。對於如何有效地監控和調優Kafka是一個大話題,不少用戶都有這樣的困擾,這章咱們就來討論一下。

 

 

當前沒有一款Kafka監控工具是公認比較優秀的,每一個都有本身的特色但也有些致命的缺陷。

主流的kafka監控工具備:Kafka Manager,Kafka Web Console,Burrow,Kafka Monitor,Kafka Offset Monitor,Kafka Eagle,Confluent Control Center。經過研究,發現主流的三種kafka監控程序分別爲:

l  Kafka Manager

l  Kafka Offset Monitor

l  Kafka Web Console

 

13.1.  Kafka Web Console

https://github.com/claudemamo/kafka-web-console

 

 

使用Kafka Web Console,能夠監控:

l  Brokers列表

l  Kafka 集羣中 Topic列表,及對應的Partition、LogSiz e等信息

l  點擊Topic,能夠瀏覽對應的Consumer Groups、Offset、Lag等信息

l  生產和消費流量圖、消息預覽…

 

 

程序運行後,會定時去讀取kafka集羣分區的日誌長度,讀取完畢後,鏈接沒有正常釋放,一段時間後產生大量的socket鏈接,致使網絡堵塞。

 

 

13.2.  Kafka Manager

https://github.com/yahoo/kafka-manager

 

雅虎開源的Kafka集羣管理工具:

l  管理幾個不一樣的集羣

l  監控集羣的狀態(topics, brokers, 副本分佈, 分區分佈)

l  產生分區分配(Generate partition assignments)基於集羣的當前狀態

l  從新分配分區

 

 

 

下載安裝

  1. 安裝sbt

yum install sbt

 

若是不行,則

curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo

sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/

sudo yum install sbt

sbt 二進制文件發佈到 Bintray,而Bintray 方便地提供了RPM資源庫。你只須要將存儲庫添加到你的軟件包管理器將檢查的地方。

 

  1. 下載編譯

git clone https://github.com/yahoo/kafka-managercd kafka-manager

cd kafka-manager

# 由於要編譯。因此下面這步操做要等好久

sbt clean distcd target/

# 在target目錄下咱們能夠看到 kafka-manager

kafka-manager-1.3.0.8.zip

 

提示:

使用sbt編譯打包的時候時間可能會比較長,若是你hang在

Loading project definition from kafka-manager/project

能夠修改project/plugins.sbt中的LogLevel參數

將logLevel := Level.Warn修改成logLevel := Level.Debug

 

  1. 解壓

unzip kafka-manager-1.3.0.8.zip -d /usr/local

cd /usr/local/kafka-manager-1.3.0.8

 

  1. 修改配置conf/application.properties

 

# 若是zk是集羣,這裏填寫多個zk地址

kafka-manager.zkhosts="localhost:2181"

 

  1. 啓動

bin/kafka-manager

kafka-manager 默認的端口是9000,可經過 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:

nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &

 

kafka 默認是不開啓JMX監控的,可是kafka-manager支持JMX監控,若是不添加,沒法監控,因此咱們須要配置kafka的JMX端口,並重啓kafka

修改bin/kafka-server-start.sh,添加JMX_PORT參數,添加後樣子以下:

 

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

export JMX_PORT="9999"

Fi

 

這樣便安裝成功了。

 

 

13.3.  Kafka Offset Monitor

Kafka Offset Monitor能夠實時監控:

l  Kafka集羣狀態

l  Topic、Consumer Group列表

l  圖形化展現topic和consumer之間的關係

l  圖形化展現consumer的Offset、Lag等信息

 

 

總結:

經過使用,我的總結以上三種監控程序的優缺點:

 

Kafka Web Console:監控功能較爲全面,能夠預覽消息,監控Offset、Lag等信息,但存在bug,不建議在生產環境中使用。

 

Kafka Manager:偏向Kafka集羣管理,若操做不當,容易致使集羣出現故障。對Kafka實時生產和消費消息是經過JMX實現的。沒有記錄Offset、Lag等信息。

 

KafkaOffsetMonitor:程序一個jar包的形式運行,部署較爲方便。只有監控功能,使用起來也較爲安全。

 

若只須要監控功能,推薦使用KafkaOffsetMonito,若偏重Kafka集羣管理,推薦使用Kafka Manager。

 

由於都是開源程序,穩定性欠缺。故需先了解清楚目前已存在哪些Bug,多測試一下,避免出現相似於Kafka Web Console的問題。

 

---

綜合來說,若只須要監控功能,推薦使用KafkaOffsetMonito,若偏重Kafka集羣管理,推薦使用Kafka Manager。

 

 

14.        調優

Kafka監控的一個主要的目的就是調優Kafka集羣。這裏羅列了一些常見的操做系統級的調優。

 

首先是保證頁緩存的大小——至少要設置頁緩存爲一個日誌段的大小。咱們知道Kafka大量使用頁緩存,只要保證頁緩存足夠大,那麼消費者讀取消息時就有大機率保證它可以直接命中頁緩存中的數據而無需從底層磁盤中讀取。故只要保證頁緩存要知足一個日誌段的大小。

 

第二是調優文件打開數。不少人對這個資源有點畏手畏腳。實際上這是一個很廉價的資源,設置一個比較大的初始值一般都是沒有什麼問題的。

 

第三是調優vm.max_map_count參數。主要適用於Kafka broker上的主題數超多的狀況。Kafka日誌段的索引文件是用映射文件的機制來作的,故若是有超多日誌段的話,這種索引文件數必然是不少的,極易打爆這個資源限制,因此對於這種狀況通常要適當調大這個參數。

 

第四是swap的設置。不少文章說把這個值設爲0,就是徹底禁止swap,我我的不建議這樣,由於當你設置成爲0的時候,一旦你的內存耗盡了,Linux會自動開啓OOM killer而後隨機找一個進程殺掉。這並非咱們但願的處理結果。相反,我建議設置該值爲一個比較接近零的較小值,這樣當個人內存快要耗盡的時候會嘗試開啓一小部分swap,雖然會致使broker變得很是慢,但至少給了用戶發現問題並處理之的機會。

 

第五JVM堆大小。首先鑑於目前Kafka新版本已經不支持Java7了,而Java 8自己不更新了,甚至Java9其實都不作了,直接作Java10了,因此我建議Kafka至少搭配Java8來搭建。至於堆的大小,我的認爲6-10G足矣。若是出現了堆溢出,就提jira給社區,讓他們看究竟是怎樣的問題。由於這種狀況下即便用戶調大heap size,也只是延緩OOM而已,不太可能從根本上解決問題。

 

 

最後,建議使用專屬的多塊磁盤來搭建Kafka集羣。自1.1版本起Kafka正式支持JBOD,所以不必在底層再使用一套RAID了。

 

 

15.        Kafka配置信息

15.1.  Broker配置信息

屬性

默認值

描述

broker.id

 

必填參數,broker的惟一標識

log.dirs

/tmp/kafka-logs

Kafka數據存放的目錄。能夠指定多個目錄,中間用逗號分隔,當新partition被建立的時會被存放到當前存放partition最少的目錄。

port

9092

BrokerServer接受客戶端鏈接的端口號

zookeeper.connect

null

Zookeeper的鏈接串,格式爲:hostname1:port1,hostname2:port2,hostname3:port3。能夠填一個或多個,爲了提升可靠性,建議都填上。注意,此配置容許咱們指定一個zookeeper路徑來存放此kafka集羣的全部數據,爲了與其餘應用集羣區分開,建議在此配置中指定本集羣存放目錄,格式爲:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。須要注意的是,消費者的參數要和此參數一致。

message.max.bytes

1000000

服務器能夠接收到的最大的消息大小。注意此參數要和consumer的maximum.message.size大小一致,不然會由於生產者生產的消息太大致使消費者沒法消費。

num.io.threads

8

服務器用來執行讀寫請求的IO線程數,此參數的數量至少要等於服務器上磁盤的數量。

queued.max.requests

500

I/O線程能夠處理請求的隊列大小,若實際請求數超過此大小,網絡線程將中止接收新的請求。

socket.send.buffer.bytes

100 * 1024

The SO_SNDBUFF buffer the server prefers for socket connections.

socket.receive.buffer.bytes

100 * 1024

The SO_RCVBUFF buffer the server prefers for socket connections.

socket.request.max.bytes

100 * 1024 * 1024

服務器容許請求的最大值, 用來防止內存溢出,其值應該小於 Java heap size.

num.partitions

1

默認partition數量,若是topic在建立時沒有指定partition數量,默認使用此值,建議改成5

log.segment.bytes

1024 * 1024 * 1024

Segment文件的大小,超過此值將會自動新建一個segment,此值能夠被topic級別的參數覆蓋。

log.roll.{ms,hours}

24 * 7 hours

新建segment文件的時間,此值能夠被topic級別的參數覆蓋。

log.retention.{ms,minutes,hours}

7 days

Kafka segment log的保存週期,保存週期超過此時間日誌就會被刪除。此參數能夠被topic級別參數覆蓋。數據量大時,建議減少此值。

log.retention.bytes

-1

每一個partition的最大容量,若數據量超過此值,partition數據將會被刪除。注意這個參數控制的是每一個partition而不是topic。此參數能夠被log級別參數覆蓋。

log.retention.check.interval.ms

5 minutes

刪除策略的檢查週期

auto.create.topics.enable

true

自動建立topic參數,建議此值設置爲false,嚴格控制topic管理,防止生產者錯寫topic。

default.replication.factor

1

默認副本數量,建議改成2。

replica.lag.time.max.ms

10000

在此窗口時間內沒有收到follower的fetch請求,leader會將其從ISR(in-sync replicas)中移除。

replica.lag.max.messages

4000

若是replica節點落後leader節點此值大小的消息數量,leader節點就會將其從ISR中移除。

replica.socket.timeout.ms

30 * 1000

replica向leader發送請求的超時時間。

replica.socket.receive.buffer.bytes

64 * 1024

The socket receive buffer for network requests to the leader for replicating data.

replica.fetch.max.bytes

1024 * 1024

The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.

replica.fetch.wait.max.ms

500

The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.

num.replica.fetchers

1

Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.

fetch.purgatory.purge.interval.requests

1000

The purge interval (in number of requests) of the fetch request purgatory.

zookeeper.session.timeout.ms

6000

ZooKeeper session 超時時間。若是在此時間內server沒有向zookeeper發送心跳,zookeeper就會認爲此節點已掛掉。 此值過低致使節點容易被標記死亡;若過高,.會致使太遲發現節點死亡。

zookeeper.connection.timeout.ms

6000

客戶端鏈接zookeeper的超時時間。

zookeeper.sync.time.ms

2000

H ZK follower落後 ZK leader的時間。

controlled.shutdown.enable

true

容許broker shutdown。若是啓用,broker在關閉本身以前會把它上面的全部leaders轉移到其它brokers上,建議啓用,增長集羣穩定性。

auto.leader.rebalance.enable

true

If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the 「preferred」 replica for each partition if it is available.

leader.imbalance.per.broker.percentage

10

The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.

leader.imbalance.check.interval.seconds

300

The frequency with which to check for leader imbalance.

offset.metadata.max.bytes

4096

The maximum amount of metadata to allow clients to save with their offsets.

connections.max.idle.ms

600000

Idle connections timeout: the server socket processor threads close the connections that idle more than this.

num.recovery.threads.per.data.dir

1

The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

unclean.leader.election.enable

true

Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.

delete.topic.enable

false

啓用deletetopic參數,建議設置爲true。

offsets.topic.num.partitions

50

The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).

offsets.topic.retention.minutes

1440

Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.

offsets.retention.check.interval.ms

600000

The frequency at which the offset manager checks for stale offsets.

offsets.topic.replication.factor

3

The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.

offsets.topic.segment.bytes

104857600

Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.

offsets.load.buffer.size

5242880

An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.

offsets.commit.required.acks

-1

The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.

offsets.commit.timeout.ms

5000

The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.

 

 

15.2.  Producer配置信息

屬性

默認值

描述

metadata.broker.list

 

啓動時producer查詢brokers的列表,能夠是集羣中全部brokers的一個子集。注意,這個參數只是用來獲取topic的元信息用,producer會從元信息中挑選合適的broker並與之創建socket鏈接。格式是:host1:port1,host2:port2。

request.required.acks

0

參見3.2節介紹

request.timeout.ms

10000

Broker等待ack的超時時間,若等待時間超過此值,會返回客戶端錯誤信息。

producer.type

sync

同步異步模式。async表示異步,sync表示同步。若是設置成異步模式,能夠容許生產者以batch的形式push數據,這樣會極大的提升broker性能,推薦設置爲異步。

serializer.class

kafka.serializer.DefaultEncoder

序列號類,.默認序列化成 byte[] 。

key.serializer.class

 

Key的序列化類,默認同上。

partitioner.class

kafka.producer.DefaultPartitioner

Partition類,默認對key進行hash。

compression.codec

none

指定producer消息的壓縮格式,可選參數爲: 「none」, 「gzip」 and 「snappy」。關於壓縮參見4.1節

compressed.topics

null

啓用壓縮的topic名稱。若上面參數選擇了一個壓縮格式,那麼壓縮僅對本參數指定的topic有效,若本參數爲空,則對全部topic有效。

message.send.max.retries

3

Producer發送失敗時重試次數。若網絡出現問題,可能會致使不斷重試。

retry.backoff.ms

100

Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.

topic.metadata.refresh.interval.ms

600 * 1000

The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed

queue.buffering.max.ms

5000

啓用異步模式時,producer緩存消息的時間。好比咱們設置成1000時,它會緩存1秒的數據再一次發送出去,這樣能夠極大的增長broker吞吐量,但也會形成時效性的下降。

queue.buffering.max.messages

10000

採用異步模式時producer buffer 隊列裏最大緩存的消息數量,若是超過這個數值,producer就會阻塞或者丟掉消息。

queue.enqueue.timeout.ms

-1

當達到上面參數值時producer阻塞等待的時間。若是值設置爲0,buffer隊列滿時producer不會阻塞,消息直接被丟掉。若值設置爲-1,producer會被阻塞,不會丟消息。

batch.num.messages

200

採用異步模式時,一個batch緩存的消息數量。達到這個數量值時producer纔會發送消息。

send.buffer.bytes

100 * 1024

Socket write buffer size

client.id

「」

The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

 

 

15.3.  Consumer配置信息

屬性

默認值

描述

group.id

 

Consumer的組ID,相同goup.id的consumer屬於同一個組。

zookeeper.connect

 

Consumer的zookeeper鏈接串,要和broker的配置一致。

consumer.id

null

若是不設置會自動生成。

socket.timeout.ms

30 * 1000

網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 肯定。

socket.receive.buffer.bytes

64 * 1024

The socket receive buffer for network requests.

fetch.message.max.bytes

1024 * 1024

查詢topic-partition時容許的最大消息大小。consumer會爲每一個partition緩存此大小的消息到內存,所以,這個參數能夠控制consumer的內存使用量。這個值應該至少比server容許的最大消息大小大,以避免producer發送的消息大於consumer容許的消息。

num.consumer.fetchers

1

The number fetcher threads used to fetch data.

auto.commit.enable

true

若是此值設置爲true,consumer會週期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啓以後將會使用此值做爲新開始消費的值。

auto.commit.interval.ms

60 * 1000

Consumer提交offset值到zookeeper的週期。

queued.max.message.chunks

2

用來被consumer消費的message chunks 數量, 每一個chunk能夠緩存fetch.message.max.bytes大小的數據量。

auto.commit.interval.ms

60 * 1000

Consumer提交offset值到zookeeper的週期。

queued.max.message.chunks

2

用來被consumer消費的message chunks 數量, 每一個chunk能夠緩存fetch.message.max.bytes大小的數據量。

fetch.min.bytes

1

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.

fetch.wait.max.ms

100

The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.

rebalance.backoff.ms

2000

Backoff time between retries during rebalance.

refresh.leader.backoff.ms

200

Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.

auto.offset.reset

largest

What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer

consumer.timeout.ms

-1

若在指定時間內沒有消息消費,consumer將會拋出異常。

exclude.internal.topics

true

Whether messages from internal topics (such as offsets) should be exposed to the consumer.

zookeeper.session.timeout.ms

6000

ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.

zookeeper.connection.timeout.ms

6000

The max time that the client waits while establishing a connection to zookeeper.

zookeeper.sync.time.ms

2000

How far a ZK follower can be behind a ZK leader

 

 

 

 

16.        Kafka調優的四個層面

Kafka調優一般能夠從4個維度展開,分別是吞吐量、延遲、持久性和可用性。在具體展開這些方面以前,我想先建議用戶保證客戶端與服務器端版本一致。若是版本不一致,就會出現向下轉化的問題。舉個例子,服務器端保存高版本的消息,當低版本消費者請求數據時,服務器端就要作轉化,先把高版本消息轉成低版本再發送給消費者。這件事情自己就很是很是低效。不少文章都討論過Kafka速度快的緣由,其中就談到了零拷貝技術——即數據不須要在頁緩存和堆緩存中來回拷貝。

 

簡單來講producer把生產的消息放到頁緩存上,若是兩邊版本一致,能夠直接把此消息推給Consumer,或者Consumer直接拉取,這個過程是不須要把消息再放到堆緩存。可是你要作向下轉化或者版本不一致的話,就要額外把數據再堆上,而後再放回到Consumer上,速度特別慢。

 

16.1.  Kafka調優-吞吐量

調優吞吐量就是咱們想用更短的時間作更多的事情。這裏列出了客戶端須要調整的參數。前面說過了producer是把消息放在緩存區,後端Sender線程從緩存區拿出來發到broker。這裏面涉及到一個打包的過程,它是批處理的操做,不是一條一條發送的。所以這個包的大小就和TPS息息相關。一般狀況下調大這個值都會讓TPS提高,可是也不會無限制的增長。不過調高此值的劣處在於消息延遲的增長。除了調整batch.size,設置壓縮也能夠提高TPS,它可以減小網絡傳輸IO。當前Lz4的壓縮效果是最好的,若是客戶端機器CPU資源很充足那麼建議開啓壓縮。

 

 

對於消費者端而言,調優TPS並無太好的辦法,可以想到的就是調整fetch.min.bytes。適當地增長該參數的值可以提高consumer端的TPS。對於Broker端而言,一般的瓶頸在於副本拉取消息時間過長,所以能夠適當地增長num.replica.fetcher值,利用多個線程同時拉取數據,能夠加快這一進程。

 

16.2.  Kafka調優-延時

所謂的延時就是指消息被處理的時間。某些狀況下咱們天然是但願越快越好。針對這方面的調優,consumer端能作的很少,簡單保持fetch.min.bytes默認值便可,這樣能夠保證consumer可以當即返回讀取到的數據。講到這裏,可能有人會有這樣的疑問:TPS和延時不是一回事嗎?假設發一條消息延時是2ms,TPS天然就是500了,由於一秒只能發500消息,其實這二者關係並非簡單的。由於我發一條消息2毫秒,可是若是把消息緩存起來統一發,TPS會提高不少。假設發一條消息依然是2ms,可是我先等8毫秒,在這8毫秒以內可能能收集到一萬條消息,而後我再發。至關於你在10毫秒內發了一萬條消息,你們能夠算一下TPS是多少。事實上,Kafka producer在設計上就是這樣的實現原理。

 

16.3.  Kafka調優-消息持久性

消息持久化本質上就是消息不丟失。Kafka對消息不丟失的承諾是有條件的。之前碰到不少人說我給Kafka發消息,發送失敗,消息丟失了,怎麼辦?嚴格來講Kafka不認爲這種狀況屬於消息丟失,由於此時消息沒有放到Kafka裏面。Kafka只對已經提交的消息作有條件的不丟失保障。

 

若是要調優持久性,對於producer而言,首先要設置重試以防止由於網絡出現瞬時抖動形成消息發送失敗。一旦開啓了重試,還須要防止亂序的問題。好比說我發送消息1與2,消息2發送成功,消息1發送失敗重試,這樣消息1就在消息2以後進入Kafka,也就是形成亂序了。若是用戶不容許出現這樣的狀況,那麼還須要顯式地設置max.in.flight.requests.per.connection爲1。

 

 

其餘參數都是很常規的參數,好比unclean.leader.election.enable參數,最好仍是將其設置成false,即不容許「髒」副本被選舉爲leader。

 

16.4.  Kafka調優-可用性

最後是可用性,與剛纔的持久性是相反的,我容許消息丟失,只要保證系統高可用性便可。所以我須要把consumer心跳超時設置爲一個比較小的值,若是給定時間內消費者沒有處理完消息,該實例可能就被踢出消費者組。我想要其餘消費者更快地知道這個決定,所以調小這個參數的值。

 

 

17.        定位性能瓶頸

下面就是性能瓶頸,嚴格來講這不是調優,這是解決性能問題。對於生產者來講,若是要定位發送消息的瓶頸很慢,咱們須要拆解發送過程當中的各個步驟。就像這張圖表示的那樣,消息的發送共有6步。

第一步就是生產者把消息放到Broker,

第2、三步就是Broker把消息拿到以後,寫到本地磁盤上,

第3、第四步是follower broker從Leader拉取消息,

第4、第五步是建立response,

第5、第六步是發送回去,告訴我已經處理完了。

 

 

這六步當中你須要肯定瓶頸在哪?怎麼肯定?

——

經過不一樣的JMX指標。

好比說步驟1是慢的,可能你常常碰到超時,你若是在日誌裏面常常碰到request timeout,就表示1是很慢的,此時要適當增長超時的時間。

 

若是二、3慢的狀況下,則可能體如今磁盤IO很是高,致使往磁盤上寫數據很是慢。

 

假若是步驟4慢的話,查看名爲remote-time的JMX指標,此時能夠增長fetcher線程的數量。

 

若是5慢的話,表現爲response在隊列致使待的時間過長,這時能夠增長網絡線程池的大小。

 

6與1是同樣的,若是你發現一、6常常出問題的話,查一下你的網絡。

因此,就這樣來分解整個的耗時。這是到底哪一步的瓶頸在哪,須要看看什麼樣的指標,作怎樣的調優。

 

 

18.        Java Consumer調優

最後說一下Consumer的調優。目前消費者有兩種使用方式,一種是同一個線程裏面就直接處理,另外一種是我採用單獨的線程,consumer線程只是作獲取消息,消息真正的處理邏輯放到單獨的線程池中作。這兩種方式有不一樣的使用場景:第一種方法實現較簡單,由於你的消息處理邏輯直接寫在一個線程裏面就能夠了,可是它的缺陷在於TPS可能不會很高,特別是當你的客戶端的機器很是強的時候,你用單線程處理的時候是很慢的,由於你沒有充分利用線程上的CPU資源。第二種方法的優點是可以充分利用底層服務器的硬件資源,TPS能夠作的很高,可是處理提交位移將會很難。

 

最後說一下參數,也是網上問的最多的,這幾個參數究竟是作什麼的。第一個參數,就是控制consumer單次處理消息的最大時間。好比說設定的是600s,那麼consumer給你10分鐘來處理。若是10分鐘內consumer沒法處理完成,那麼coordinator就會認爲此consumer已死,從而開啓rebalance。

 

Coordinator是用來管理消費者組的協調者,協調者如何在有效的時間內,把消費者實例掛掉的消息傳遞給其餘消費者,就靠心跳請求,所以能夠設置heartbeat.interval.ms爲一個較小的值,好比5s。

 

 

19.        Kafka命令大全

 

整理kafka相關的經常使用命令

19.1.  管理

## 建立主題(4個分區,2個副本)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test

 

19.2.  查詢

## 查詢集羣描述

bin/kafka-topics.sh --describe --zookeeper

 

## 消費者列表查詢

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

 

## 新消費者列表查詢(支持0.9版本+)

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

 

## 顯示某個消費組的消費詳情(僅支持offset存儲在zookeeper上的)

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

 

## 顯示某個消費組的消費詳情(支持0.9版本+)

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

 

19.3.  發送和消費

## 生產者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 

## 消費者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

 

## 新生產者(支持0.9版本+)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

 

## 新消費者(支持0.9版本+)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

 

## 高級點的用法

bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10

 

19.4.  平衡leader

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

 

19.5.  Kafka自帶壓測命令

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092

 

19.6.  增長副本

1.建立規則json

cat > increase-replication-factor.json <<EOF

{"version":1, "partitions":[

{"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},

{"topic":"__consumer_offsets","partition":9,"replicas":[0,1]}]

}

EOF

 

2.執行

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

 

3.驗證

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

相關文章
相關標籤/搜索