Kafka深度解析

本文轉發自Jason’s Blog原文連接 http://www.jasongj.com/2015/01/02/Kafka深度解析html

背景介紹

Kafka簡介

  Kafka是一種分佈式的,基於發佈/訂閱的消息系統。主要設計目標以下:前端

  • 以時間複雜度爲O(1)的方式提供消息持久化能力,即便對TB級以上數據也能保證常數時間的訪問性能
  • 高吞吐率。即便在很是廉價的商用機器上也能作到單機支持每秒100K條消息的傳輸
  • 支持Kafka Server間的消息分區,及分佈式消費,同時保證每一個partition內的消息順序傳輸
  • 同時支持離線數據處理和實時數據處理

爲何要用消息系統

  • 解耦
    在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息隊列在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束java

  • 冗餘
    有些狀況下,處理數據的過程會失敗。除非數據被持久化,不然將形成丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。在被許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除以前,須要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。node

  • 擴展性
    由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的;只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。git

  • 靈活性 & 峯值處理能力
    在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。github

  • 可恢復性
    當體系的一部分組件失效,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。而這種容許重試或者延後處理請求的能力一般是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。web

  • 送達保證
    消息隊列提供的冗餘機制保證了消息能被實際的處理,只要一個進程讀取了該隊列便可。在此基礎上,IronMQ提供了一個」只送達一次」保證。不管有多少進程在從隊列中領取數據,每個消息只能被處理一次。這之因此成爲可能,是由於獲取一個消息只是」預約」了這個消息,暫時把它移出了隊列。除非客戶端明確的表示已經處理完了這個消息,不然這個消息會被放回隊列中去,在一段可配置的時間以後可再次被處理。算法

  • 順序保證
    在大多使用場景下,數據處理的順序都很重要。消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。IronMO保證消息經過FIFO(先進先出)的順序來處理,所以消息在隊列中的位置就是從隊列中檢索他們的位置。數據庫

  • 緩衝
    在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行–寫入隊列的處理會盡量的快速,而不受從隊列讀的預備處理的約束。該緩衝有助於控制和優化數據流通過系統的速度。apache

  • 理解數據流
    在一個分佈式系統裏,要獲得一個關於用戶操做會用多長時間及其緣由的整體印象,是個巨大的挑戰。消息隊列經過消息被處理的頻率,來方便的輔助肯定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。

  • 異步通訊
    不少時候,你不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許你把一個消息放入隊列,但並不當即處理它。你想向隊列中放入多少消息就放多少,而後在你樂意的時候再去處理它們。

經常使用Message Queue對比

  • RabbitMQ
    RabbitMQ是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量級,更適合於企業級的開發。同時實現了Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。

  • Redis
    Redis是一個基於Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。

  • ZeroMQ
    ZeroMQ號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演了這個服務角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。其中,Twitter的Storm 0.9.0之前的版本中默認使用ZeroMQ做爲數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty做爲傳輸模塊)。

  • ActiveMQ
    ActiveMQ是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。

  • Kafka/Jafka
    Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式發佈/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制來統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。

Kafka解析

Terminology

  • Broker
    Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker
  • Topic
    每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲topic。(物理上不一樣topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic便可生產或消費數據而沒必要關心數據存於何處)
  • Partition
    parition是物理上的概念,每一個topic包含一個或多個partition,建立topic時可指定parition數量。每一個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件
  • Producer
    負責發佈消息到Kafka broker
  • Consumer
    消費消息。每一個consumer屬於一個特定的consumer group(可爲每一個consumer指定group name,若不指定group name則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。

Kafka架構

kafka architecture 架構
  如上圖所示,一個典型的kafka集羣中包含若干producer(能夠是web前端產生的page view,或者是服務器日誌,系統CPU、memory等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干consumer group,以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發佈到broker,consumer使用pull模式從broker訂閱並消費消息。   

Push vs. Pull

  做爲一個messaging system,Kafka遵循了傳統的方式,選擇由producer向broker push消息並由consumer從broker pull消息。一些logging-centric system,好比Facebook的Scribe和Cloudera的Flume,採用很是不一樣的push模式。事實上,push模式和pull模式各有優劣。
  push模式很難適應消費速率不一樣的消費者,由於消息發送速率是由broker決定的。push模式的目標是儘量以最快速度傳遞消息,可是這樣很容易形成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則能夠根據consumer的消費能力以適當的速率消費消息。

Topic & Partition

  Topic在邏輯上能夠被認爲是一個queue。每條消費都必須指定它的topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得Kafka的吞吐率能夠水平擴展,物理上把topic分紅一個或多個partition,每一個partition在物理上對應一個文件夾,該文件夾下存儲這個partition的全部消息和索引文件。
  kafka topic partition
  每一個日誌文件都是「log entries」序列,每個log entry包含一個4字節整型數(值爲N),其後跟N個字節的消息體。每條消息都有一個當前partition下惟一的64字節的offset,它指明瞭這條消息的起始位置。磁盤上存儲的消息格式以下:
  message length : 4 bytes (value: 1+4+n)
  「magic」 value : 1 byte
  crc : 4 bytes
  payload : n bytes
  這個「log entries」並不是由一個文件構成,而是分紅多個segment,每一個segment名爲該segment第一條消息的offset和「.kafka」組成。另外會有一個索引文件,它標明瞭每一個segment下包含的log entry的offset範圍,以下圖所示。
  kafka partition segment
  由於每條消息都被append到該partition中,是順序寫磁盤,所以效率很是高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
  kafka partition
  每一條消息被髮送到broker時,會根據paritition規則選擇被存儲到哪個partition。若是partition規則設置的合理,全部消息能夠均勻分佈到不一樣的partition裏,這樣就實現了水平擴展。(若是一個topic對應一個文件,那這個文件所在的機器I/O將會成爲這個topic的性能瓶頸,而partition解決了這個問題)。在建立topic時能夠在$KAFKA_HOME/config/server.properties中指定這個partition的數量(以下所示),固然也能夠在topic建立以後去修改parition數量。

1
2
3
4
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

 

  在發送一條消息時,能夠指定這條消息的key,producer根據這個key和partition機制來判斷將這條消息發送到哪一個parition。paritition機制能夠經過指定producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中若是key能夠被解析爲整數則將對應的整數與partition總數取餘,該消息會被髮送到該數對應的partition。(每一個parition都會有個序號)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class JasonPartitioner<T> implements Partitioner {

public JasonPartitioner(VerifiableProperties verifiableProperties) {}

@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
 
  若是將上例中的class做爲partition.class,並經過以下代碼發送20條消息(key分別爲0,1,2,3)至topic2(包含4個partition)。
  

public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
   List messageList = new ArrayList<KeyedMessage<String, String>>();
   for(int j = 0; j < 4; j++){
   messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
   }
   producer.send(messageList);
}
  producer.close();
}

 

  則key相同的消息會被髮送並存儲到同一個partition裏,並且key的序號正好和partition序號相同。(partition序號從0開始,本例中的key也正好從0開始)。以下圖所示。
  kafka partition key
  對於傳統的message queue而言,通常會刪除已經被消費的消息,而Kafka集羣會保留全部的消息,不管其被消費與否。固然,由於磁盤限制,不可能永久保留全部數據(實際上也不必),所以Kafka提供兩種策略去刪除舊數據。一是基於時間,二是基於partition文件大小。例如能夠經過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一週前的數據,也可經過配置讓Kafka在partition文件超過1GB時刪除舊數據,以下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  ############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# By default the log cleaner is disabled and the log retention policy will default to
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
#can then be marked for log compaction.
log.cleaner.enable=false

 

  這裏要注意,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會爲每個consumer group保留一些metadata信息–當前消費的消息的position,也即offset。這個offset由consumer控制。正常狀況下consumer會在消費完一條消息後線性增長這個offset。固然,consumer也可將offset設成一個較小的值,從新消費一些消息。由於offet由consumer控制,因此Kafka broker是無狀態的,它不須要標記哪些消息被哪些consumer過,不須要經過broker去保證同一個consumer group只有一個consumer能消費某一條消息,所以也就不須要鎖機制,這也爲Kafka的高吞吐率提供了有力保障。      

Replication & Leader election

  Kafka從0.8開始提供partition級別的replication,replication的數量可在$KAFKA_HOME/config/server.properties中配置。

1
default.replication.factor = 1

 

  該 Replication與leader election配合提供了自動的failover機制。replication對Kafka的吞吐率是有必定影響的,但極大的加強了可用性。默認狀況下,Kafka的replication數量爲1。  每一個partition都有一個惟一的leader,全部的讀寫操做都在leader上完成,leader批量從leader上pull數據。通常狀況下partition的數量大於等於broker的數量,而且全部partition的leader均勻分佈在broker上。follower上的日誌和其leader上的徹底同樣。
  和大部分分佈式系統同樣,Kakfa處理失敗須要明肯定義一個broker是否alive。對於Kafka而言,Kafka存活包含兩個條件,一是它必須維護與Zookeeper的session(這個經過Zookeeper的heartbeat機制來實現)。二是follower必須可以及時將leader的writing複製過來,不能「落後太多」。
  leader會track「in sync」的node list。若是一個follower宕機,或者落後太多,leader將把它從」in sync」 list中移除。這裏所描述的「落後太多」指follower複製的消息落後於leader後的條數超過預約值,該值可在$KAFKA_HOME/config/server.properties中配置

1
2
3
4
5
#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead
replica.lag.max.messages=4000

#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead
replica.lag.time.max.ms=10000

 

  須要說明的是,Kafka只解決」fail/recover」,不處理「Byzantine」(「拜占庭」)問題。
  一條消息只有被「in sync」 list裏的全部follower都從leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower複製就宕機了,而形成數據丟失(consumer沒法消費這些數據)。而對於producer而言,它能夠選擇是否等待消息commit,這能夠經過request.required.acks來設置。這種機制確保了只要「in sync」 list有一個或以上的flollower,一條被commit的消息就不會丟失。
  這裏的複製機制即不是同步複製,也不是單純的異步複製。事實上,同步複製要求「活着的」follower都複製完,這條消息纔會被認爲commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka很是重要的一個特性)。而異步複製方式下,follower異步的從leader複製數據,數據只要被leader寫入log就被認爲已經commit,這種狀況下若是follwer都落後於leader,而leader忽然宕機,則會丟失數據。而Kafka的這種使用「in sync」 list的方式則很好的均衡了確保數據不丟失以及吞吐率。follower能夠批量的從leader複製數據,這樣極大的提升複製性能(批量寫磁盤),極大減小了follower與leader的差距(前文有說到,只要follower落後leader不太遠,則被認爲在「in sync」 list裏)。
  
  上文說明了Kafka是如何作replication的,另一個很重要的問題是當leader宕機了,怎樣在follower中選舉出新的leader。由於follower可能落後許多或者crash了,因此必須確保選擇「最新」的follower做爲新的leader。一個基本的原則就是,若是leader不在了,新的leader必須擁有原來的leader commit的全部消息。這就須要做一個折衷,若是leader在標明一條消息被commit前等待更多的follower確認,那在它die以後就有更多的follower能夠做爲新的leader,但這也會形成吞吐率的降低。
  一種很是經常使用的選舉leader的方式是「majority vote」(「少數服從多數」),但Kafka並未採用這種方式。這種模式下,若是咱們有2f+1個replica(包含leader和follower),那在commit以前必須保證有f+1個replica複製完消息,爲了保證正確選出新的leader,fail的replica不能超過f個。由於在剩下的任意f+1個replica裏,至少有一個replica包含有最新的全部消息。這種方式有個很大的優點,系統的latency只取決於最快的幾臺server,也就是說,若是replication factor是3,那latency就取決於最快的那個follower而非最慢那個。majority vote也有一些劣勢,爲了保證leader election的正常進行,它所能容忍的fail的follower個數比較少。若是要容忍1個follower掛掉,必需要有3個以上的replica,若是要容忍2個follower掛掉,必需要有5個以上的replica。也就是說,在生產環境下爲了保證較高的容錯程度,必需要有大量的replica,而大量的replica又會在大數據量下致使性能的急劇降低。這就是這種算法更多用在Zookeeper這種共享集羣配置的系統中而不多在須要存儲大量數據的系統中使用的緣由。例如HDFS的HA feature是基於majority-vote-based journal,可是它的數據存儲並無使用這種expensive的方式。
  實際上,leader election算法很是多,好比Zookeper的ZabRaftViewstamped Replication。而Kafka所使用的leader election算法更像微軟的PacificA算法。
  Kafka在Zookeeper中動態維護了一個ISR(in-sync replicas) set,這個set裏的全部replica都跟上了leader,只有ISR裏的成員纔有被選爲leader的可能。在這種模式下,對於f+1個replica,一個Kafka topic能在保證不丟失已經ommit的消息的前提下容忍f個replica的失敗。在大多數使用場景中,這種模式是很是有利的。事實上,爲了容忍f個replica的失敗,majority vote和ISR在commit前須要等待的replica數量是同樣的,可是ISR須要的總的replica的個數幾乎是majority vote的一半。
  雖然majority vote與ISR相比有不需等待最慢的server這一優點,可是Kafka做者認爲Kafka能夠經過producer選擇是否被commit阻塞來改善這一問題,而且節省下來的replica和磁盤使得ISR模式仍然值得。
  
  上文提到,在ISR中至少有一個follower時,Kafka能夠確保已經commit的數據不丟失,但若是某一個partition的全部replica都掛了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:

  • 等待ISR中的任一個replica「活」過來,而且選它做爲leader
  • 選擇第一個「活」過來的replica(不必定是ISR中的)做爲leader

  這就須要在可用性和一致性當中做出一個簡單的平衡。若是必定要等待ISR中的replica「活」過來,那不可用的時間就可能會相對較長。並且若是ISR中的全部replica都沒法「活」過來了,或者數據都丟失了,這個partition將永遠不可用。選擇第一個「活」過來的replica做爲leader,而這個replica不是ISR中的replica,那即便它並不保證已經包含了全部已commit的消息,它也會成爲leader而做爲consumer的數據源(前文有說明,全部讀寫都由leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在之後的版本中,Kafka支持用戶經過配置選擇這兩種方式中的一種,從而根據不一樣的使用場景選擇高可用性仍是強一致性。
  
  上文說明了一個parition的replication過程,然爾Kafka集羣須要管理成百上千個partition,Kafka經過round-robin的方式來平衡partition從而避免大量partition集中在了少數幾個節點上。同時Kafka也須要平衡leader的分佈,儘量的讓全部partition的leader均勻分佈在不一樣broker上。另外一方面,優化leadership election的過程也是很重要的,畢竟這段時間相應的partition處於不可用狀態。一種簡單的實現是暫停宕機的broker上的全部partition,併爲之選舉leader。實際上,Kafka選舉一個broker做爲controller,這個controller經過watch Zookeeper檢測全部的broker failure,並負責爲全部受影響的parition選舉leader,再將相應的leader調整命令發送至受影響的broker,過程以下圖所示。
  kafka controller
  
  這樣作的好處是,能夠批量的通知leadership的變化,從而使得選舉過程成本更低,尤爲對大量的partition而言。若是controller失敗了,倖存的全部broker都會嘗試在Zookeeper中建立/controller->{this broker id},若是建立成功(只可能有一個建立成功),則該broker會成爲controller,若建立不成功,則該broker會等待新controller的命令。
  kafka controller failover

Consumer group

  (本節全部描述都是基於consumer hight level API而非low level API)。
  每個consumer實例都屬於一個consumer group,每一條消息只會被同一個consumer group裏的一個consumer實例消費。(不一樣consumer group能夠同時消費同一條消息)
  kafka consumer group
  
  不少傳統的message queue都會在消息被消費完後將消息刪除,一方面避免重複消費,另外一方面能夠保證queue的長度比較少,提升效率。而如上文所將,Kafka並不刪除已消費的消息,爲了實現傳統message queue消息只被消費一次的語義,Kafka保證保證同一個consumer group裏只有一個consumer會消費一條消息。與傳統message queue不一樣的是,Kafka還容許不一樣consumer group同時消費同一條消息,這一特性能夠爲消息的多元化處理提供了支持。實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還能夠同時將數據實時備份到另外一個數據中心,只須要保證這三個操做所使用的consumer在不一樣的consumer group便可。下圖展現了Kafka在Linkedin的一種簡化部署。
  kafka deployment in linkedin
  爲了更清晰展現Kafka consumer group的特性,筆者做了一項測試。建立一個topic (名爲topic1),建立一個屬於group1的consumer實例,並建立三個屬於group2的consumer實例,而後經過producer向topic1發送key分別爲1,2,3r的消息。結果發現屬於group1的consumer收到了全部的這三條消息,同時group2中的3個consumer分別收到了key爲1,2,3的消息。以下圖所示。
  kafka consumer group

Consumer Rebalance

  (本節所講述內容均基於Kafka consumer high level API)
  Kafka保證同一consumer group中只有一個consumer會消費某條消息,實際上,Kafka保證的是穩定狀態下每個consumer實例只會消費某一個或多個特定partition的數據,而某個partition的數據只會被某一個特定的consumer實例所消費。這樣設計的劣勢是沒法讓同一個consumer group裏的consumer均勻消費數據,優點是每一個consumer不用都跟大量的broker通訊,減小通訊開銷,同時也下降了分配難度,實現也更簡單。另外,由於同一個partition裏的數據是有序的,這種設計能夠保證每一個partition裏的數據也是有序被消費。
  若是某consumer group中consumer數量少於partition數量,則至少有一個consumer會消費多個partition的數據,若是consumer的數量與partition數量相同,則正好一個consumer消費一個partition的數據,而若是consumer的數量多於partition的數量時,會有部分consumer沒法消費該topic下任何一條消息。
  以下例所示,若是topic1有0,1,2共三個partition,當group1只有一個consumer(名爲consumer1)時,該 consumer可消費這3個partition的全部數據。
  kafka consumer group rebalance
  增長一個consumer(consumer2)後,其中一個consumer(consumer1)可消費2個partition的數據,另一個consumer(consumer2)可消費另一個partition的數據。
  kafka consumer group rebalance
  再增長一個consumer(consumer3)後,每一個consumer可消費一個partition的數據。consumer1消費partition0,consumer2消費partition1,consumer3消費partition2
  kafka consumer group rebalance
  再增長一個consumer(consumer4)後,其中3個consumer可分別消費一個partition的數據,另一個consumer(consumer4)不能消費topic1任何數據。
  kafka consumer group rebalance
  此時關閉consumer1,剩下的consumer可分別消費一個partition的數據。
  kafka consumer group
  接着關閉consumer2,剩下的consumer3可消費2個partition,consumer4可消費1個partition。
  kafka consumer group
  再關閉consumer3,剩下的consumer4可同時消費topic1的3個partition。
  kafka consumer group

  consumer rebalance算法以下:   

  • Sort PT (all partitions in topic T)
  • Sort CG(all consumers in consumer group G)
  • Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
  • Remove current entries owned by Ci from the partition owner registry
  • Assign partitions from iN to (i+1)N-1 to consumer Ci
  • Add newly assigned partitions to the partition owner registry

  目前consumer rebalance的控制策略是由每個consumer經過Zookeeper完成的。具體的控制方式以下:

  • Register itself in the consumer id registry under its group.
  • Register a watch on changes under the consumer id registry.
  • Register a watch on changes under the broker id registry.
  • If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
  • Force itself to rebalance within in its consumer group.
      
      在這種策略下,每個consumer或者broker的增長或者減小都會觸發consumer rebalance。由於每一個consumer只負責調整本身所消費的partition,爲了保證整個consumer group的一致性,因此當一個consumer觸發了rebalance時,該consumer group內的其它全部consumer也應該同時觸發rebalance。

  目前(2015-01-19)最新版(0.8.2)Kafka採用的是上述方式。但該方式有不利的方面:

  • Herd effect
      任何broker或者consumer的增減都會觸發全部的consumer的rebalance
  • Split Brain
      每一個consumer分別單獨經過Zookeeper判斷哪些partition down了,那麼不一樣consumer從Zookeeper「看」到的view就可能不同,這就會形成錯誤的reblance嘗試。並且有可能全部的consumer都認爲rebalance已經完成了,但實際上可能並不是如此。

  根據Kafka官方文檔,Kafka做者正在考慮在還未發佈的0.9.x版本中使用中心協調器(coordinator)。大致思想是選舉出一個broker做爲coordinator,由它watch Zookeeper,從而判斷是否有partition或者consumer的增減,而後生成rebalance命令,並檢查是否這些rebalance在全部相關的consumer中被執行成功,若是不成功則重試,若成功則認爲這次rebalance成功(這個過程跟replication controller很是相似,因此我很奇怪爲何當初設計replication controller時沒有使用相似方式來解決consumer rebalance的問題)。流程以下:
  kafka coordinator     

消息Deliver guarantee

  經過上文介紹,想必讀者已經明天了producer和consumer是如何工做的,以及Kafka是如何作replication的,接下來要討論的是Kafka如何確保消息在producer和consumer之間傳輸。有這麼幾種可能的delivery guarantee:

  • At most once 消息可能會丟,但毫不會重複傳輸
  • At least one 消息毫不會丟,但可能會重複傳輸
  • Exactly once 每條消息確定會被傳輸一次且僅傳輸一次,不少時候這是用戶所想要的。
      
      Kafka的delivery guarantee semantic很是直接。當producer向broker發送消息時,一旦這條消息被commit,因數replication的存在,它就不會丟。可是若是producer發送數據給broker後,遇到的網絡問題而形成通訊中斷,那producer就沒法判斷該條消息是否已經commit。這一點有點像向一個自動生成primary key的數據庫表中插入數據。雖然Kafka沒法肯定網絡故障期間發生了什麼,可是producer能夠生成一種相似於primary key的東西,發生故障時冪等性的retry屢次,這樣就作到了Exactly one。截止到目前(Kafka 0.8.2版本,2015-01-25),這一feature還並未實現,有但願在Kafka將來的版本中實現。(因此目前默認狀況下一條消息從producer和broker是確保了At least once,但可經過設置producer異步發送實現At most once)。
      接下來討論的是消息從broker到consumer的delivery guarantee semantic。(僅針對Kafka consumer high level API)。consumer在從broker讀取消息後,能夠選擇commit,該操做會在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit以後的開始位置相同。固然能夠將consumer設置爲autocommit,即consumer一旦讀到數據當即自動commit。若是隻討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際上實際使用中consumer並不是讀取完數據就結束了,而是要進行進一步處理,而數據處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
  • 讀完消息先commit再處理消息。這種模式下,若是consumer在commit後還沒來得及處理消息就crash了,下次從新開始工做後就沒法讀到剛剛已提交而未處理的消息,這就對應於At most once
  • 讀完消息先處理再commit。這種模式下,若是處理完了消息在commit以前consumer crash了,下次從新開始工做時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應於At least once。在不少狀況使用場景下,消息都有一個primary key,因此消息的處理每每具備冪等性,即屢次處理這一條消息跟只處理一次是等效的,那就能夠認爲是Exactly once。(人個感受這種說法有些牽強,畢竟它不是Kafka自己提供的機制,並且primary key自己不保證操做的冪等性。並且實際上咱們說delivery guarantee semantic是討論被處理多少次,而非處理結果怎樣,由於處理方式多種多樣,咱們的系統不該該把處理過程的特性–如是否冪等性,當成Kafka自己的feature)
  • 若是必定要作到Exactly once,就須要協調offset和實際操做的輸出。精典的作法是引入兩階段提交。若是能讓offset和操做輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,由於許多輸出系統可能不支持兩階段提交。好比,consumer拿到數據後可能把數據放到HDFS,若是把最新的offset和數據自己一塊兒寫到HDFS,那就能夠保證數據的輸出和offset的更新要麼都完成,要麼都不完成,間接實現Exactly once。(目前就high level API而言,offset是存於Zookeeper中的,沒法存於HDFS,而low level API的offset是由本身去維護的,能夠將之存於HDFS中)
      總之,Kafka默認保證At least once,而且容許經過設置producer異步提交來實現At most once。而Exactly once要求與目標存儲系統協做,幸運的是Kafka提供的offset可使用這種方式很是直接很是容易。

Benchmark

  紙上得來終覺淺,絕知些事要躬行。筆者但願能親自測一下Kafka的性能,而非從網上找一些測試數據。因此筆者曾在0.8發佈前兩個月作過詳細的Kafka0.8性能測試,不過很惋惜測試報告不慎丟失。所幸在網上找到了Kafka的創始人之一的Jay Kreps的bechmark。如下描述皆基於該benchmark。(該benchmark基於Kafka0.8.1)

測試環境

  該benchmark用到了六臺機器,機器配置以下

  • Intel Xeon 2.5 GHz processor with six cores
  • Six 7200 RPM SATA drives
  • 32GB of RAM
  • 1Gb Ethernet
      
      這6臺機器其中3臺用來搭建Kafka broker集羣,另外3臺用來安裝Zookeeper及生成測試數據。6個drive都直接以非RAID方式掛載。實際上kafka對機器的需求與Hadoop的相似。

Producer吞吐率

  該項測試只測producer的吞吐率,也就是數據只被持久化,沒有consumer讀數據。

1個producer線程,無replication

  在這一測試中,建立了一個包含6個partition且沒有replication的topic。而後經過一個線程儘量快的生成50 million條比較短(payload100字節長)的消息。測試結果是821,557 records/second78.3MB/second)。
  之因此使用短消息,是由於對於消息系統來講這種使用場景更難。由於若是使用MB/second來表徵吞吐率,那發送長消息無疑能使得測試結果更好。
  整個測試中,都是用每秒鐘delivery的消息的數量乘以payload的長度來計算MB/second的,沒有把消息的元信息算在內,因此實際的網絡使用量會比這個大。對於本測試來講,每次還需傳輸額外的22個字節,包括一個可選的key,消息長度描述,CRC等。另外,還包含一些請求相關的overhead,好比topic,partition,acknowledgement等。這就致使咱們比較難判斷是否已經達到網卡極限,可是把這些overhead都算在吞吐率裏面應該更合理一些。所以,咱們已經基本達到了網卡的極限。
  初步觀察此結果會認爲它比人們所預期的要高不少,尤爲當考慮到Kafka要把數據持久化到磁盤當中。實際上,若是使用隨機訪問數據系統,好比RDBMS,或者key-velue store,可預期的最高訪問頻率大概是5000到50000個請求每秒,這和一個好的RPC層所能接受的遠程請求量差很少。而該測試中遠超於此的緣由有兩個。

  • Kafka確保寫磁盤的過程是線性磁盤I/O,測試中使用的6塊廉價磁盤線性I/O的最大吞吐量是822MB/second,這已經遠大於1Gb網卡所能帶來的吞吐量了。許多消息系統把數據持久化到磁盤當成是一個開銷很大的事情,這是由於他們對磁盤的操做都不是線性I/O。
  • 在每個階段,Kafka都儘可能使用批量處理。若是想了解批處理在I/O操做中的重要性,能夠參考David Patterson的」Latency Lags Bandwidth

1個producer線程,3個異步replication

  該項測試與上一測試基本同樣,惟一的區別是每一個partition有3個replica(因此網絡傳輸的和寫入磁盤的總的數據量增長了3倍)。每個broker即要寫做爲leader的partition,也要讀(從leader讀數據)寫(將數據寫到磁盤)做爲follower的partition。測試結果爲786,980 records/second75.1MB/second)。
  該項測試中replication是異步的,也就是說broker收到數據並寫入本地磁盤後就acknowledge producer,而沒必要等全部replica都完成replication。也就是說,若是leader crash了,可能會丟掉一些最新的還未備份的數據。但這也會讓message acknowledgement延遲更少,實時性更好。
  這項測試說明,replication能夠很快。整個集羣的寫能力可能會因爲3倍的replication而只有原來的三分之一,可是對於每個producer來講吞吐率依然足夠好。   

1個producer線程,3個同步replication

  該項測試與上一測試的惟一區別是replication是同步的,每條消息只有在被in sync集合裏的全部replica都複製過去後纔會被置爲committed(此時broker會向producer發送acknowledgement)。在這種模式下,Kafka能夠保證即便leader crash了,也不會有數據丟失。測試結果爲421,823 records/second40.2MB/second)。
  Kafka同步複製與異步複製並無本質的不一樣。leader會始終track follower replica從而監控它們是否還alive,只有全部in sync集合裏的replica都acknowledge的消息纔可能被consumer所消費。而對follower的等待影響了吞吐率。能夠經過增大batch size來改善這種狀況,但爲了不特定的優化而影響測試結果的可比性,本次測試並無作這種調整。   

3個producer,3個異步replication

  該測試至關於把上文中的1個producer,複製到了3臺不一樣的機器上(在1臺機器上跑多個實例對吞吐率的增長不會有太大幫忙,由於網卡已經基本飽和了),這3個producer同時發送數據。整個集羣的吞吐率爲2,024,032 records/second193,0MB/second)。

Producer Throughput Vs. Stored Data

  消息系統的一個潛在的危險是當數據能都存於內存時性能很好,但當數據量太大沒法徹底存於內存中時(而後不少消息系統都會刪除已經被消費的數據,但當消費速度比生產速度慢時,仍會形成數據的堆積),數據會被轉移到磁盤,從而使得吞吐率降低,這又反過來形成系統沒法及時接收數據。這樣就很是糟糕,而實際上不少情景下使用queue的目的就是解決數據消費速度和生產速度不一致的問題。
  但Kafka不存在這一問題,由於Kafka始終以O(1)的時間複雜度將數據持久化到磁盤,因此其吞吐率不受磁盤上所存儲的數據量的影響。爲了驗證這一特性,作了一個長時間的大數據量的測試,下圖是吞吐率與數據量大小的關係圖。
  kafka throughput
  上圖中有一些variance的存在,並能夠明顯看到,吞吐率並不受磁盤上所存數據量大小的影響。實際上從上圖能夠看到,當磁盤數據量達到1TB時,吞吐率和磁盤數據只有幾百MB時沒有明顯區別。
  這個variance是由Linux I/O管理形成的,它會把數據緩存起來再批量flush。上圖的測試結果是在生產環境中對Kafka集羣作了些tuning後獲得的,這些tuning方法可參考這裏。   

consumer吞吐率

  須要注意的是,replication factor並不會影響consumer的吞吐率測試,由於consumer只會從每一個partition的leader讀數據,而與replicaiton factor無關。一樣,consumer吞吐率也與同步複製仍是異步複製無關。   

1個consumer

  該測試從有6個partition,3個replication的topic消費50 million的消息。測試結果爲940,521 records/second89.7MB/second)。
  能夠看到,Kafkar的consumer是很是高效的。它直接從broker的文件系統裏讀取文件塊。Kafka使用sendfile API來直接經過操做系統直接傳輸,而不用把數據拷貝到用戶空間。該項測試實際上從log的起始處開始讀數據,因此它作了真實的I/O。在生產環境下,consumer能夠直接讀取producer剛剛寫下的數據(它可能還在緩存中)。實際上,若是在生產環境下跑I/O stat,你能夠看到基本上沒有物理「讀」。也就是說生產環境下consumer的吞吐率會比該項測試中的要高。

3個consumer

  將上面的consumer複製到3臺不一樣的機器上,而且並行運行它們(從同一個topic上消費數據)。測試結果爲2,615,968 records/second249.5MB/second)。
  正如所預期的那樣,consumer的吞吐率幾乎線性增漲。   

Producer and Consumer

  上面的測試只是把producer和consumer分開測試,而該項測試同時運行producer和consumer,這更接近使用場景。實際上目前的replication系統中follower就至關於consumer在工做。
  該項測試,在具備6個partition和3個replica的topic上同時使用1個producer和1個consumer,而且使用異步複製。測試結果爲795,064 records/second75.8MB/second)。
  能夠看到,該項測試結果與單獨測試1個producer時的結果幾乎一致。因此說consumer很是輕量級。   

消息長度對吞吐率的影響

  上面的全部測試都基於短消息(payload 100字節),而正如上文所說,短消息對Kafka來講是更難處理的使用方式,能夠預期,隨着消息長度的增大,records/second會減少,但MB/second會有所提升。下圖是records/second與消息長度的關係圖。
  kafka throughput
  正如咱們所預期的那樣,隨着消息長度的增長,每秒鐘所能發送的消息的數量逐漸減少。可是若是看每秒鐘發送的消息的總大小,它會隨着消息長度的增長而增長,以下圖所示。
  kafka benchmark
  從上圖能夠看出,當消息長度爲10字節時,由於要頻繁入隊,花了太多時間獲取鎖,CPU成了瓶頸,並不能充分利用帶寬。但從100字節開始,咱們能夠看到帶寬的使用逐漸趨於飽和(雖然MB/second仍是會隨着消息長度的增長而增長,但增長的幅度也愈來愈小)。   

端到端的Latency

  上文中討論了吞吐率,那消息傳輸的latency如何呢?也就是說消息從producer到consumer須要多少時間呢?該項測試建立1個producer和1個consumer並反覆計時。結果是,2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile)
  (這裏並無說明topic有多少個partition,也沒有說明有多少個replica,replication是同步仍是異步。實際上這會極大影響producer發送的消息被commit的latency,而只有committed的消息才能被consumer所消費,因此它會最終影響端到端的latency)   

重現該benchmark

  若是讀者想要在本身的機器上重現本次benchmark測試,能夠參考本次測試的配置和所使用的命令
  實際上Kafka Distribution提供了producer性能測試工具,可經過bin/kafka-producer-perf-test.sh腳原本啓動。所使用的命令以下
  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Producer
Setup
bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test-rep-one --partitions 6 --replication-factor 1
bin/kafka-topics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3

Single thread, no replication

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test7 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Single-thread, async 3x replication

bin/kafktopics.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --create --topic test --partitions 6 --replication-factor 3
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test6 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Single-thread, sync 3x replication

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=-1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000

Three Producers, 3x async replication
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Throughput Versus Stored Data

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

Effect of message size

for i in 10 100 1000 10000 100000;
do
echo ""
echo $i
bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test $((1000*1024*1024/$i)) $i -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=128000
done;

Consumer
Consumer throughput

bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1

3 Consumers

On three servers, run:
bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1

End-to-end Latency

bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency esv4-hcl198.grid.linkedin.com:9092 esv4-hcl197.grid.linkedin.com:2181 test 5000

Producer and consumer

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 50000000 100 -1 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196

bin/kafka-consumer-perf-test.sh --zookeeper esv4-hcl197.grid.linkedin.com:2181 --messages 50000000 --topic test --threads 1

 

  broker配置以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#host.name=localhost

# The number of threads handling network requests
num.network.threads=4

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# The directory under which to store log files
log.dirs=/grid/a/dfs-data/kafka-logs,/grid/b/dfs-data/kafka-logs,/grid/c/dfs-data/kafka-logs,/grid/d/dfs-data/kafka-logs,/grid/e/dfs-data/kafka-logs,/grid/f/dfs-data/kafka-logs

# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=8

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is the most
# important performance knob in kafka.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
# 3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=esv4-hcl197.grid.linkedin.com:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=false

replica.lag.max.messages=10000000

 

  讀者也可參考另一份Kafka性能測試報告

相關文章
相關標籤/搜索