原創文章,同步首發自做者我的博客。轉載請務必在文章開頭處以超連接形式註明出處http://www.jasongj.com/kafka/high_throughput/java
上一篇文章《Kafka設計解析(五)- Kafka性能測試方法及Benchmark報告》從測試角度說明了Kafka的性能。本文從宏觀架構層面和具體實現層面分析了Kafka如何實現高性能。react
Kafka是一個Pub-Sub的消息系統,不管是發佈仍是訂閱,都須指定Topic。如《Kafka設計解析(一)- Kafka背景及架構介紹》一文所述,Topic只是一個邏輯的概念。每一個Topic都包含一個或多個Partition,不一樣Partition可位於不一樣節點。同時Partition在物理上對應一個本地文件夾,每一個Partition包含一個或多個Segment,每一個Segment包含一個數據文件和一個與之對應的索引文件。在邏輯上,能夠把一個Partition看成一個很是長的數組,可經過這個「數組」的索引(offset)去訪問其數據。數組
一方面,因爲不一樣Partition可位於不一樣機器,所以能夠充分利用集羣優點,實現機器間的並行處理。另外一方面,因爲Partition在物理上對應一個文件夾,即便多個Partition位於同一個節點,也可經過配置讓同一節點上的不一樣Partition置於不一樣的disk drive上,從而實現磁盤間的並行處理,充分發揮多磁盤的優點。網絡
利用多磁盤的具體方法是,將不一樣磁盤mount到不一樣目錄,而後在server.properties中,將log.dirs
設置爲多目錄(用逗號分隔)。Kafka會自動將全部Partition儘量均勻分配到不一樣目錄也即不一樣目錄(也即不一樣disk)上。架構
注:雖然物理上最小單位是Segment,但Kafka並不提供同一Partition內不一樣Segment間的並行處理。由於對於寫而言,每次只會寫Partition內的一個Segment,而對於讀而言,也只會順序讀取同一Partition內的不一樣Segment。併發
如同《Kafka設計解析(四)- Kafka Consumer設計解析》一文所述,多Consumer消費同一個Topic時,同一條消息只會被同一Consumer Group內的一個Consumer所消費。而數據並不是按消息爲單位分配,而是以Partition爲單位分配,也即同一個Partition的數據只會被一個Consumer所消費(在不考慮Rebalance的前提下)。異步
若是Consumer的個數多於Partition的個數,那麼會有部分Consumer沒法消費該Topic的任何數據,也即當Consumer個數超過Partition後,增長Consumer並不能增長並行度。socket
簡而言之,Partition個數決定了可能的最大並行度。以下圖所示,因爲Topic 2只包含3個Partition,故group2中的Consumer 三、Consumer 四、Consumer 5 可分別消費1個Partition的數據,而Consumer 6消費不到Topic 2的任何數據。
分佈式
以Spark消費Kafka數據爲例,若是所消費的Topic的Partition數爲N,則有效的Spark最大並行度也爲N。即便將Spark的Executor數設置爲N+M,最多也只有N個Executor可同時處理該Topic的數據。ide
CAP理論是指,分佈式系統中,一致性、可用性和分區容忍性最多隻能同時知足兩個。
一致性
可用性
分區容忍性
通常而言,都要求保證分區容忍性。因此在CAP理論下,更多的是須要在可用性和一致性之間作權衡。
Master-Slave
WNR
Paxos及其變種
基於ISR的數據複製方案
如《Kafka High Availability(上)》一文所述,Kafka的數據複製是以Partition爲單位的。而多個備份間的數據複製,經過Follower向Leader拉取數據完成。從一這點來說,Kafka的數據複製方案接近於上文所講的Master-Slave方案。不一樣的是,Kafka既不是徹底的同步複製,也不是徹底的異步複製,而是基於ISR的動態複製方案。
ISR,也即In-sync Replica。每一個Partition的Leader都會維護這樣一個列表,該列表中,包含了全部與之同步的Replica(包含Leader本身)。每次數據寫入時,只有ISR中的全部Replica都複製完,Leader纔會將其置爲Commit,它才能被Consumer所消費。
這種方案,與同步複製很是接近。但不一樣的是,這個ISR是由Leader動態維護的。若是Follower不能緊「跟上」Leader,它將被Leader從ISR中移除,待它又從新「跟上」Leader後,會被Leader再次加加ISR中。每次改變ISR後,Leader都會將最新的ISR持久化到Zookeeper中。
至於如何判斷某個Follower是否「跟上」Leader,不一樣版本的Kafka的策略稍微有些區別。
replica.lag.time.max.ms
時間內未向Leader發送Fetch請求(也即數據複製請求),則Leader會將其從ISR中移除。若是某Follower持續向Leader發送Fetch請求,可是它與Leader的數據差距在replica.lag.max.messages
以上,也會被Leader從ISR中移除。replica.lag.max.messages
被移除,故Leader再也不考慮Follower落後的消息條數。另外,Leader不只會判斷Follower是否在replica.lag.time.max.ms
時間內向其發送Fetch請求,同時還會考慮Follower是否在該時間內與之保持同步。對於0.8.*版本的replica.lag.max.messages
參數,不少讀者曾留言提問,既然只有ISR中的全部Replica複製完後的消息才被認爲Commit,那爲什麼會出現Follower與Leader差距過大的狀況。緣由在於,Leader並不須要等到前一條消息被Commit才接收後一條消息。事實上,Leader能夠按順序接收大量消息,最新的一條消息的Offset被記爲High Wartermark。而只有被ISR中全部Follower都複製過去的消息纔會被Commit,Consumer只能消費被Commit的消息。因爲Follower的複製是嚴格按順序的,因此被Commit的消息以前的消息確定也已經被Commit過。換句話說,High Watermark標記的是Leader所保存的最新消息的offset,而Commit Offset標記的是最新的可被消費的(已同步到ISR中的Follower)消息。而Leader對數據的接收與Follower對數據的複製是異步進行的,所以會出現Commit Offset與High Watermark存在必定差距的狀況。0.8.*版本中replica.lag.max.messages
限定了Leader容許的該差距的最大值。
Kafka基於ISR的數據複製方案原理以下圖所示。
如上圖所示,在第一步中,Leader A總共收到3條消息,故其high watermark爲3,但因爲ISR中的Follower只同步了第1條消息(m1),故只有m1被Commit,也即只有m1可被Consumer消費。此時Follower B與Leader A的差距是1,而Follower C與Leader A的差距是2,均未超過默認的replica.lag.max.messages
,故得以保留在ISR中。在第二步中,因爲舊的Leader A宕機,新的Leader B在replica.lag.time.max.ms
時間內未收到來自A的Fetch請求,故將A從ISR中移除,此時ISR={B,C}。同時,因爲此時新的Leader B中只有2條消息,並未包含m3(m3從未被任何Leader所Commit),因此m3沒法被Consumer消費。第四步中,Follower A恢復正常,它先將宕機前未Commit的全部消息所有刪除,而後從最後Commit過的消息的下一條消息開始追趕新的Leader B,直到它「遇上」新的Leader,才被從新加入新的ISR中。
Majority Quorum
方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。min.insync.replicas
參數指定了Broker所要求的ISR最小長度,默認值爲1。也即極限狀況下ISR能夠只包含Leader。但此時若是Leader宕機,則該Partition不可用,可用性得不到保證。acks
參數指定最少須要多少個Replica確認收到該消息才視爲該消息發送成功。acks
的默認值是1,即Leader收到該消息後當即告訴Producer收到該消息,此時若是在ISR中的消息複製完該消息前Leader宕機,那該條消息會丟失。而若是將該值設置爲0,則Producer發送完數據後,當即認爲該數據發送成功,不做任何等待,而實際上該數據可能發送失敗,而且Producer的Retry機制將不生效。更推薦的作法是,將acks
設置爲all
或者-1
,此時只有ISR中的全部Replica都收到該數據(也即該消息被Commit),Leader纔會告訴Producer該消息發送成功,從而保證不會有未知的數據丟失。根據《一些場景下順序寫磁盤快於隨機寫內存》所述,將寫磁盤的過程變爲順序寫,可極大提升對磁盤的利用率。
Kafka的整個設計中,Partition至關於一個很是長的數組,而Broker接收到的全部消息順序寫入這個大數組中。同時Consumer經過Offset順序消費這些數據,而且不刪除已經消費的數據,從而避免了隨機寫磁盤的過程。
因爲磁盤有限,不可能保存全部數據,實際上做爲消息系統Kafka也不必保存全部數據,須要刪除舊的數據。而這個刪除過程,並不是經過使用「讀-寫」模式去修改文件,而是將Partition分爲多個Segment,每一個Segment對應一個物理文件,經過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操做。
經過以下代碼可知,Kafka刪除Segment的方式,是直接刪除Segment對應的整個log文件和整個index文件而非刪除文件中的部份內容。
/** * Delete this log segment from the filesystem. * * @throws KafkaStorageException if the delete fails. */ def delete() { val deletedLog = log.delete() val deletedIndex = index.delete() val deletedTimeIndex = timeIndex.delete() if(!deletedLog && log.file.exists) throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") if(!deletedIndex && index.file.exists) throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") if(!deletedTimeIndex && timeIndex.file.exists) throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.") }
使用Page Cache的好處以下
Broker收到數據後,寫磁盤時只是將數據寫入Page Cache,並不保證數據必定徹底寫入磁盤。從這一點看,可能會形成機器宕機時,Page Cache內的數據未寫入磁盤從而形成數據丟失。可是這種丟失只發生在機器斷電等形成操做系統不工做的場景,而這種場景徹底能夠由Kafka層面的Replication機制去解決。若是爲了保證這種狀況下數據不丟失而強制將Page Cache中的數據Flush到磁盤,反而會下降性能。也正因如此,Kafka雖然提供了flush.messages
和flush.ms
兩個參數將Page Cache中的數據強制Flush到磁盤,可是Kafka並不建議使用。
若是數據消費速度與生產速度至關,甚至不須要經過物理磁盤交換數據,而是直接經過Page Cache交換數據。同時,Follower從Leader Fetch數據時,也可經過Page Cache完成。下圖爲某Partition的Leader節點的網絡/磁盤讀寫信息。
從上圖能夠看到,該Broker每秒經過網絡從Producer接收約35MB數據,雖然有Follower從該Broker Fetch數據,可是該Broker基本無讀磁盤。這是由於該Broker直接從Page Cache中將數據取出返回給了Follower。
Broker的log.dirs
配置項,容許配置多個文件夾。若是機器上有多個Disk Drive,可將不一樣的Disk掛載到不一樣的目錄,而後將這些目錄都配置到log.dirs
裏。Kafka會盡量將不一樣的Partition分配到不一樣的目錄,也即不一樣的Disk上,從而充分利用了多Disk的優點。
Kafka中存在大量的網絡數據持久化到磁盤(Producer到Broker)和磁盤文件經過網絡發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的總體吞吐量。
以將磁盤文件經過網絡發送爲例。傳統模式下,通常使用以下僞代碼所示的方法先將文件數據讀入內存,而後經過Socket將內存中的數據發送出去。
buffer = File.read Socket.send(buffer)
這一過程實際上發生了四次數據拷貝。首先經過系統調用將文件數據讀入到內核態Buffer(DMA拷貝),而後應用程序將內存態Buffer數據讀入到用戶態Buffer(CPU拷貝),接着用戶程序經過Socket發送數據時將用戶態Buffer數據拷貝到內核態Buffer(CPU拷貝),最後經過DMA拷貝將數據拷貝到NIC Buffer。同時,還伴隨着四次上下文切換,以下圖所示。
Linux 2.4+內核經過sendfile
系統調用,提供了零拷貝。數據經過DMA拷貝到內核態Buffer後,直接經過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減小數據拷貝外,由於整個讀文件-網絡發送由一個sendfile
調用完成,整個過程只有兩次上下文切換,所以大大提升了性能。零拷貝過程以下圖所示。
從具體實現來看,Kafka的數據傳輸經過TransportLayer來完成,其子類PlaintextTransportLayer
經過Java NIO的FileChannel的transferTo
和transferFrom
方法實現零拷貝,以下所示。
@Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, socketChannel); }
注: transferTo
和transferFrom
並不保證必定能使用零拷貝。其實是否能使用零拷貝與操做系統相關,若是操做系統提供sendfile
這樣的零拷貝系統調用,則這兩個方法會經過這樣的系統調用充分利用零拷貝的優點,不然並不能經過這兩個方法自己實現零拷貝。
批處理是一種經常使用的用於提升I/O性能的方式。對Kafka而言,批處理既減小了網絡傳輸的Overhead,又提升了寫磁盤的效率。
Kafka 0.8.1及之前的Producer區分同步Producer和異步Producer。同步Producer的send方法主要分兩種形式。一種是接受一個KeyedMessage做爲參數,一次發送一條消息。另外一種是接受一批KeyedMessage做爲參數,一次性發送多條消息。而對於異步發送而言,不管是使用哪一個send方法,實現上都不會當即將消息發送給Broker,而是先存到內部的隊列中,直到消息條數達到閾值或者達到指定的Timeout才真正的將消息發送出去,從而實現了消息的批量發送。
Kafka 0.8.2開始支持新的Producer API,將同步Producer和異步Producer結合。雖然從send接口來看,一次只能發送一個ProducerRecord,而不能像以前版本的send方法同樣接受消息列表,可是send方法並不是當即將消息發送出去,而是經過batch.size
和linger.ms
控制實際發送頻率,從而實現批量發送。
因爲每次網絡傳輸,除了傳輸消息自己之外,還要傳輸很是多的網絡協議自己的一些內容(稱爲Overhead),因此將多條消息合併到一塊兒傳輸,可有效減小網絡傳輸的Overhead,進而提升了傳輸效率。
從零拷貝章節的圖中能夠看到,雖然Broker持續從網絡接收數據,可是寫磁盤並不是每秒都在發生,而是間隔一段時間寫一次磁盤,而且每次寫磁盤的數據量都很是大(最高達到718MB/S)。
Kafka從0.7開始,即支持將數據壓縮後再傳輸給Broker。除了能夠將每條消息單獨壓縮而後傳輸外,Kafka還支持在批量發送時,將整個Batch的消息一塊兒壓縮後傳輸。數據壓縮的一個基本原理是,重複數據越多壓縮效果越好。所以將整個Batch的數據一塊兒壓縮能更大幅度減少數據量,從而更大程度提升網絡傳輸效率。
Broker接收消息後,並不直接解壓縮,而是直接將消息以壓縮後的形式持久化到磁盤。Consumer Fetch到數據後再解壓縮。所以Kafka的壓縮不只減小了Producer到Broker的網絡傳輸負載,同時也下降了Broker磁盤操做的負載,也下降了Consumer與Broker間的網絡傳輸量,從而極大得提升了傳輸效率,提升了吞吐量。
Kafka消息的Key和Payload(或者說Value)的類型可自定義,只需同時提供相應的序列化器和反序列化器便可。所以用戶能夠經過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減小實際網絡傳輸和磁盤存儲的數據規模,從而提升吞吐率。這裏要注意,若是使用的序列化方法太慢,即便壓縮比很是高,最終的效率也不必定高。