kafka在設計之初就須要考慮如下4個方面的問題:java
對於任何一個消息引擎而言,吞吐量都是相當重要的性能指標。那麼何爲吞吐量呢?一般來講,吞吐量是某種處理能力的最大值。而對於Kafka而言,它的吞吐量就是每秒可以處理的消息數或者每秒可以處理的字節數。很顯然,咱們天然但願消息引擎的吞吐量越大越好。node
消息引擎系統還有一個名爲延時的性能指標。它衡量的是一段時間間隔,多是發出某個操做與接收到操做響應(response)之間的時間,或者是在系統中致使某些物理變動的起始時刻與變動正式生效時刻之間的間隔。對於Kafka而言,延時能夠表示客戶端發起請求與服務器處理請求併發送響應給客戶端之間的這一段時間。顯而易見,延時間隔越短越好。算法
在實際使用場景中,這兩個指標一般是一對矛盾體,即調優其中一個指標一般會使另外一個指標變差。在採起必定延時的同時採用批處理的思想,即一小批一小批(micro-batch)地發送,則會大大提高吞吐量。apache
Kafka是如何作到高吞吐量、低延時的呢?首先,Kafka的寫入操做是很快的,這主要得益於它對磁盤的使用方法的不一樣。雖然Kafka會持久化全部數據到磁盤,但本質上每次寫入操做其實都只是把數據寫入到操做系統的頁緩存(page cache)中,而後由操做系統自行決定何時把頁緩存中的數據寫回磁盤上。這樣的設計有3個主要優點。後端
請特別留意上面的第3點。對於普通的物理磁盤(非固態硬盤)而言,咱們老是認爲磁盤的讀/寫操做是很慢的。事實上普通SAS磁盤隨機讀/寫的吞吐量的確是很慢的,可是磁盤的順序讀/寫操做實際上是很是快的,它的速度甚至能夠匹敵內存的隨機I/O速度,如圖1.5所示。隨機內存I/O的速度是36.7MB/s,而順序磁盤I/O的速度甚至達到了52.2MB/s,絲絕不遜於內存的I/O操做性能。api
鑑於這一事實,Kafka在設計時採用了追加寫入消息的方式,即只能在日誌文件末尾追加寫入新的消息,且不容許修改已寫入的消息,所以它屬於典型的磁盤順序訪問型操做,因此Kafka消息發送的吞吐量是很高的。在實際使用過程當中能夠很輕鬆地作到每秒寫入幾萬甚至幾十萬條消息。數組
下面咱們來看看Kafka的消費端是如何作到高吞吐量、低延時的。以前提到了Kafka是把消息寫入操做系統的頁緩存中的。那麼一樣地,Kafka在讀取消息時會首先嚐試從OS的頁緩存中讀取,若是命中便把消息經頁緩存直接發送到網絡的Socket上。這個過程就是利用Linux平臺的sendfile系統調用作到的,而這種技術就是大名鼎鼎的零拷貝(Zero Copy)技術。緩存
總結一下,Kafka就是依靠下列4點達到了高吞吐量、低延時的設計目標的。安全
Kafka是要持久化消息的,並且要把消息持久化到磁盤上。這樣作的好處以下:服務器
另外,Kafka實現持久化的設計也有新穎之處。普通的系統在實現持久化時可能會先儘可能使用內存,當內存資源耗盡時,再一次性地把數據「刷盤」;而Kafka則反其道而行之,全部數據都會當即被寫入文件系統的持久化日誌中,以後Kafka服務器纔會返回結果給客戶端通知它們消息已被成功寫入。這樣作既實時保存了數據,又減小了Kafka程序對於內存的消耗,從而將節省出的內存留給頁緩存使用,更進一步地提高了 總體性能。
何爲負載均衡?顧名思義就是讓系統的負載根據必定的規則均衡地分配在全部參與工做的服務器上,從而最大限度地提高系統總體的運行效率。具體到Kafka來講,默認狀況下Kafka的每臺服務器都有均等的機會爲Kafka的客戶提供服務,能夠把負載分散到全部集羣中的機器上,避免出現「耗盡某臺服務器」的狀況發生。
Kafka實現負載均衡其實是經過智能化的分區領導者選舉(partition leader election)來實現的。Kafka默認提供了很智能的leader選舉算法,能夠在集羣的全部機器上以均等機會分散各個partition的leader,從而總體上實現了負載均衡。
除了負載均衡,完備的分佈式系統還須要支持故障轉移。所謂故障轉移,是指當服務器意外停止時,整個集羣能夠快速地檢測到該失效(failure),並當即將該服務器上的應用或服務自動轉移到其餘服務器上。故障轉移一般是以「心跳」或「會話」的機制來實現的,即只要主服務器與備份服務器之間的心跳沒法維持或主服務器註冊到服務器中心的會話超時過時了,那麼就認爲主服務器已沒法正常運行,集羣會自動啓動某個備份服務器來替代主服務器的工做。
Kafka服務器支持故障轉移的方式就是使用會話機制。每臺Kafka服務器啓動後會以會話的形式把本身註冊到Zookeeper服務器上。一但該服務器運轉出現問題,與Zookeeper的會話便不能維持從而超時失效,此時Kafka集羣會選舉出另外一臺服務器來完成代替這臺服務器繼續提供服務,如圖1.8所示:
所謂伸縮性,表示向分佈式系統中增長額外的計算資源(好比CPU、內存、存儲或帶寬)時吞吐量提高的能力。阻礙線性擴容的一個很常見的因素就是狀態的保存。咱們知道,不管是哪類分佈式系統,集羣中的每臺服務器必定會維護不少內部狀態。若是由服務器本身來保存這些狀態信息,則必需要處理一致性的問題。相反,若是服務器是無狀態的,狀態的保存和管理交於專門的協調服務來作(好比Zookeeper),那麼整個集羣的服務器之間就無須繁重的狀態共享,這極大地下降了維護複雜度。假若要擴容集羣節點,只需簡單地啓動新的節點機器進行自動負載均衡就能夠了。
Kafka正是採用了這樣的思想——每臺Kafka服務器上的狀態統一交由ZooKeeper保管。擴展Kafka集羣也只須要一步:啓動新的Kafka服務器便可。
在Kafka 0.10.0.0版本正式推出了Kafka Streams,即流式處理組件。自此Kafka正式成爲了一個流式處理框架,而不只僅是消息引擎了。Kafka的架構圖以下:
不論Kafka如何變遷,其核心架構老是相似的,無非是生產一些消息而後再消費一些消息。若是總結起來那就是三句話:
Kafka服務器即broker。Kafka有一些基本術語須要掌握,這是後續學習Kafka的基礎。首先,Kafka是分佈式的集羣。一個集羣可能由一臺或多臺機器組成。Kafka集羣中保存的每條消息都歸屬於一個topic。本節將分別從消息、topic、partition和replica幾個方面詳細介紹Kafka的基本概念。
Kafka的消息格式由不少字段組成,其中的不少字段都是用於管理消息的元數據字段,對用戶來講是徹底透明的。Kafka消息格式共經歷過3次變遷,它們被分別稱爲V0,V1和V2版本。目前大部分用戶使用的應該仍是V1版本的消息格式。V1版本消息的完整格式如圖1.10所示。
如圖1.10所示,消息由消息頭部、key和value組成。消息頭部包括消息的CRC碼、消息版本號、屬性、時間戳、鍵長度和消息體長度等信息。其實,對於普通用戶來講,掌握如下3個字段的含義就足夠通常的使用了。
另外這裏單獨提一下消息的屬性字段,Kafka爲該字段分配了1字節。目前只使用了最低的3位用於保存消息的壓縮類型,其他5位還沒有使用。當前只支持4種壓縮類型:0(無壓縮)、1(GZIP)、2(Snappay)和3(LZ4)。
其次,Kafka使用緊湊的二進制字節數組來保存上面這些字段,也就是說沒有任何多餘的比特位浪費。在Java內存模型(Java memory model,JMM)中,對象保存的開銷其實至關大,對於小對象而言,一般要花費2倍的空間來保存數據(甚至更糟)。另外,隨着堆上數據量愈來愈大,GC的性能會降低不少,從而總體上拖慢了系統的吞吐量。所以Kafka在消息設計時特地避開了繁重的Java堆上內存分配,直接使用緊湊二進制字節數組ByteBuffer而不是獨立的對象,所以咱們至少可以訪問多一倍的可用內存。按照Kafka官網的說法,在一臺32GB內存的機器上,Kafka幾乎能用到28~30GB的物理內存,同時還沒必要擔憂GC的糟糕性能。
同時,大量使用頁緩存而非堆內存還有一個好處——當出現Kafka broker進程崩潰時,堆內存上的數據也一併消失,但頁緩存的數據依然存在。下次Kafka broker重啓後能夠繼續提供服務,不須要再單獨「熱」緩存了。
在本節中咱們詳細說說這兩個Kafka核心概念。
從概念上來講,topic只是一個邏輯概念,表明了一類消息,也能夠認爲是消息被髮送到的地方。一般咱們能夠使用topic來區分實際業務,好比業務A使用一個topic,業務B使用另一個topic。
Kafka中的topic一般都會被多個消費者訂閱,所以出於性能的考量,Kafka並非topic-message的兩級結構,而是採用了topic-partition-message的三級結構來分散負載。從本質上說,每一個Kafka topic都由若干個partition組成,如圖1.11所示。
這張來自Kafka官網的topic和partition關係圖很是清楚地代表了它們兩者之間的關係:topic是由多個partition組成的。而Kafka的partition是不可修改的有序消息序列,也能夠說是有序的消息日誌。每一個partition有本身專屬的partition號,一般是從0開始的。用戶對partition惟一能作的操做就是在消息序列的尾部追加寫入消息。partition上的每條消息都會被分配一個惟一的序列號——按照Kafka的術語來說,該序列號被稱爲位移(offset)。該位移值是從0開始順序遞增的整數。位移信息能夠惟必定位到某partition下的一條消息。
值得一提的是,Kafka的partition實際上並無太多的業務含義,它的引入就是單純地爲了提高系統的吞吐量,所以在建立kafka topic的時候能夠根據集羣實際配置設置具體的partition數,實現總體性能的最大化。
前面說過,topic partition下的每條消息都被分配一個位移值。實際上,Kafka消費者端也有位移(offset)的概念,但必定要注意這兩個offset屬於不一樣的概念,如圖1.12所示。
顯然,每條消息在某個partition的位移是固定的,但消息該partition的消費者的位移會隨着消費進度不斷前移,但終究不可能超過該分區最新一條消息的位移。在之後討論位移的時候要注意區分是生產位移仍是消費位移的。
綜合以前說的topic、partition和offset,咱們能夠斷言Kafka中的一條消息其實就是一個<topic,partition,offset>三元組(tuple),經過該元組值咱們能夠在Kafka集羣中找到惟一對應的那條消息。
分佈式系統必然要實現高可靠性,而目前實現的主要途徑仍是依靠冗餘機制——簡單地說,就是備份多份日誌。這些備份日誌在Kafka中被稱爲副本(replica),它們存在的惟一目的就是防止數據丟失。
副本分爲兩類:領導者副本(leader replica)和追隨者副本(follower replica)。follower replica 是不能提供服務給客戶端的,也就是說不負責響應客戶端發來的消息寫入和消息消費請求。它只是被動地向領導者副本(leader replica)獲取數據,而一旦leader replica所在的broker宕機,kafka會從剩餘的replica中選舉出新的leader繼續提供服務。
如前所述,Kafka的replica分爲兩個角色:領導者(leader)和追隨者(follower)。現在這種角色設定幾乎徹底取代了過去的主備的提法(Master-Slave)。和傳統主備系統(好比MySQL)不一樣的是,在這類leader-follower系統中一般只有leader對外提供服務,follower只是被動地追隨leader的狀態,保持與leader的同步。follower存在的惟一價值就是充當leader的候補:一旦leader掛掉當即就會有一個追隨者被選舉成爲新的leader接替它的工做。Kafka就是這樣的設計,如圖1.13所示。
Kafka保證同一個partition的多個replica必定不會分配在同一臺broker上。畢竟若是同一個broker上有同一個partition的多個replica,那麼將沒法實現備份冗餘的效果。
ISR的全稱是in-sync replica,翻譯過來就是與leader replica保持同步的replica集合。這是一個特別重要的概念。前面講了不少關於Kafka的副本機制,好比一個partition能夠配置N個replica,那麼這是否就意味着該partition能夠容忍N-1個replica失效而不丟失數據呢?答案是「否」!
Kafka爲partition動態維護一個replica集合。該集合中的全部replica保存的消息日誌都與leader replica保持同步狀態。只有這個集合中的replica才能被選舉爲leader,也只有該集合中全部replica都接收到了同一條消息,kafka纔會將該消息置於「已提交」狀態,即認爲這條消息發送成功。回到剛纔的問題,Kafka承諾只要這個集合中至少存在一個replica,那些「已提交」狀態的消息就不會丟失——記住這句話的兩個關鍵點:①ISR中至少存在一個「活着的」replica;②「已提交」消息。
正常狀況下,partition的全部replica(含leader replica)都應該與leader replica保持同步,即全部replica都在ISR中。由於各類各樣的緣由,一小部分replica開始落後於leader replica的進度。當滯後到必定程度時,Kafka會將這些replica「踢」出ISR。相反地,當這些replica從新「追上」了leader的進度時,那麼Kafka會將它們加回到ISR中。這一切都是自動維護的,不須要用戶進行人工干預,於是在保證了消息交付語義的同時還簡化了用戶的操做成本。
Kafka以消息引擎聞名,所以它特別適合處理生產環境中的那些流式數據。如下就是Kafka在實際應用中的一些典型使用場景。
Kafka很是適合替代傳統的消息總線(message bus)或消息代理(message broker)。傳統的這類系統擅長於解耦生產者和消費者以及批量處理消息,而這些特色Kafka都具有。除此以外,Kafka還具備更好的吞吐量特性,其內置的分區機制和副本機制既實現了高性能的消息傳輸,同時還達到了高可靠性和高容錯性。所以Kafka特別適合用於實現一個超大量級消息處理應用。
Kafka最先就是用於重建用戶行爲數據追蹤系統的。不少網站上的用戶操做都會以消息的形式發送到Kafka的某個對應的topic上。這些點擊流蘊含了巨大的商業價值,事實上,目前就有不少創業公司使用機器學習或其餘實時處理框架來幫助收集並分析用戶的點擊流數據。鑑於這種點擊流數據量時很大的,Kafka超強的吞吐量特性此時就有了用武之地。
不少企業和組織都須要對關鍵的操做和運維進行監控和審計。這就須要從各個運維應用程序處實時彙總操做步驟信息進行集中式管理。在這種使用場景下,你會發現Kafka是很是適合的解決方案,它能夠便捷地對多路消息進行實時收集,同時因爲其持久化的特性,使得後續離線審計成爲可能。
這多是Kafka最多見的使用方式了——日誌收集彙總解決方案。每一個企業都會產生大量的服務日誌,這些日誌分散在不一樣的機器上。咱們能夠使用Kafka對它們進行全量收集,並集中送往下游的分佈式存儲中(好比HDFS等)。比起其餘主流的日誌抽取框架(好比Apache Flume),Kafka有更好的性能,並且提供了完備的可靠性解決方案,同時還保持了低延時的特色。
Event Sourcing其實是領域驅動設計(Domain-Driven Design,DDD)的名詞,它使用事件序列表示狀態變動,這種思想和Kafka的設計特性不謀而合。還記得吧,Kafka也是用不可變動的消息序列來抽象化表示業務消息的,所以Kafka特別適合做爲這種應用的後端存儲。
前面簡要提到過,不少用戶接觸到Kafka都是由於它的消息引擎功能。自0.10.0.0版本開始,Kafka社區推出了一個全新的流式處理組件Kafka Streams。這標誌着Kakfa正式進入流式處理框架俱樂部。相比Apache Storm、Apache Samza,或是最近風頭正勁的Spark Streaming,抑或是Apache Flink,Kafka Streams目前還有點差距,相信後面完善會愈來愈好。
在Kafka世界中,一般把producer和consumer通稱爲客戶端(即clients),這是與服務器(即broker)相對應的。
在Kafka 0.9.0.0版本中,社區正式使用Java版本的producer替換了原Scala版本的producer。新版本的producer的主要入口類是org.apache.kafka.clients.producer.KafkaProducer,而非原來的kafka.producer.Producer。
新版本producer重寫了以前服務器端代碼提供的不少數據結構,擺脫了對服務器端代碼庫的依賴,同時新版本的producer也再也不依賴於Zookeeper,甚至不須要和Zookeeper集羣進行直接交互,下降了系統的維護成本,也簡化了部署producer應用的開銷成本。一段典型的新版本producer代碼以下:
上面的代碼中比較關鍵的是KafkaProducer.send方法,它是實現發送邏輯的主要入口方法。新版本producer總體工做流程圖如圖2.2所示。
新版本的producer大體就是將用戶待發送的消息封裝成一個ProducerRecord對象,而後使用KafkaProducer.send方法進行發送。實際上,KafkaProducer拿到消息後對其進行序列化,而後結合本地緩存的元數據信息確立目標分區,最後寫入內存緩衝區。同時,KafkaProducer中還有一個專門的Sender I/O線程負責將緩衝區中的消息分批次發送給Kafka broker。
比起舊版本的producer,新版本在設計理念上有如下幾個特色(或者說是優點)。
新版本producer的API中比較關鍵的方法以下:
Kafka 0.9.0.0 版本不只廢棄了舊版本producer,還提供了新版本的consumer。一樣地,新版本consumer也是使用Java語言編寫的,也再也不須要依賴Zookeeper的幫助。新版本consumer的入口類是org.apache.kafka.clients.consumer.KafkaConsumer。由此也能夠看出,新版本客戶端的代碼包都是org.apache.kafka.clients,這一點須要特別注意,由於它是區分新舊客戶端的一個重要特徵。
在舊版本consumer中,消費位移(offset)的保存與管理都是依託於ZooKeeper來完成的。當數據量很大且消費很頻繁時,ZooKeeper的讀/寫性能每每容易成爲系統瓶頸。這是舊版本consumer爲人逅病的缺陷之一。而在新版本consumer中,位移的管理與保存再也不依靠ZooKeeper了,天然這個瓶頸就消失了。
一段典型的consumer代碼以下:
同理,上面代碼中比較關鍵的是KafkaConsumer.poll方法。它是實現消息消費的主邏輯入口方法。新版本consumer在設計時摒棄了舊版本多線程消費不一樣分區的思想,採用了相似於Linux epoll的輪詢機制,使得consumer只使用一個線程就能夠管理連向不一樣broker的多個Socket,既減小了線程間的開銷成本,同時也簡化了系統的設計。
比起舊版本consumer,新版本在設計上的突出優點以下:
比起舊版本而言,新版本在API設計上提供了更加豐富的功能,新版consumer API其中比較關鍵的方法以下:
和producer不一樣的是,目前新舊consumer共存於最新版本的Kafka中。
典型的生產環境至少須要部署多個節點共同組成一個分佈式集羣總體爲咱們提供服務。本章將會詳細討論生產環境中集羣的安裝、配置與驗證。不過在此以前,咱們還須要解決3個方面的問題。它們分別是操做系統的選型、硬件規劃和容量規劃。
Kafka的服務器端代碼是由Scala語言編寫的,而新版本客戶端代碼是由Java語言編寫的。和Java同樣,Scala編譯器會把源程序.scala文件編譯成.class文件,所以Scala也是JVM系的語言。所以,只要是支持Java程序部署的平臺都應該可以部署Kafka。
目前部署Kafka最多的3類操做系統分別是Linux,OS X和Windows,其中部署在Linux上的最多,而Linux也是推薦的操做系統。
如今,咱們將分別從磁盤、內存、帶寬和CPU等幾個方面探討部署Kafka集羣所必要的關鍵規劃因素。首先從磁盤開始提及。
衆所周知,Kafka是大量使用磁盤的,Kafka的每條消息都必須被持久化到底層的存儲中,而且只有被規定數量的broker成功接收後才能通知clients消息發送成功,所以消息越是被更快地保存在磁盤上,處理clients請求的延時越低,表現出來的用戶體驗也就越好。
在肯定磁盤時,一個常見的問題就是選擇普通的機械硬盤(HDD)仍是固態硬盤(SSD)。機械硬盤成本低且容量大,而SSD一般有着極低的尋道時間(seek time)和存取時間(access time),性能上的優點很大,但同時也有着很是高的成本。所以在規劃Kafka線上環境時,讀者就須要根據公司自身的實際條件進行有針對性的選型。其實,Kafka使用磁盤的方式在很大程度上抵消了SSD提供的那些突出優點。由於Kafka是順序寫磁盤,而磁盤順序I/O的性能,即便機械硬盤也是不弱的——順序I/O不須要頻繁地移動磁頭,於是節省了耗時的尋道時間。所以對於預算有限且追求高性價比的公司而言,機械硬盤徹底能夠勝任Kafka存儲的任務。
關於磁盤的選擇,另外一個比較熱門的爭論就在於,JBOD與磁盤陣列(下稱RAID)之爭。這裏的JBOD全稱是Just Bunch Of Disks,翻譯過來就是一堆普通磁盤的意思。在部署線上Kafka環境時,應當如何抉擇呢?是使用一堆普通商用磁盤進行安裝仍是搭建專屬的RAID呢?具體問題具體分析。
首先分析一下RAID與Kafka的相適性。常見的RAID是RAID 10 ,也被稱爲RAID 1+0,它結合了磁盤鏡像和磁盤條帶化兩種技術共同保護數據,既實現了不錯的性能也提供了很高的可靠性。RAID 10集合了RAID 0 和RAID 1的優勢,但在空間上使用了磁盤鏡像,所以總體的磁盤使用率只有50%。換句話說就是將通常的磁盤容量都用做提供冗餘。自Kafka 0.8.x版本,用戶就能夠使用RAID做爲存儲來爲Kafka提供服務了。事實上,根據公開的資料顯示,LinkedIn公司的Kafka集羣就是使用RAID 10做爲底層存儲的。除了默認提供的數據冗餘以外,RAID 10還能夠將數據自動地負載分佈到多個磁盤上。
因而可知,RAID做爲Kafka的底層存儲其實主要的優點有兩個。
以上兩個優點對於任何系統而言都是很是好特性。不過對於Kafka而言,Kafka在框架層面其實已經提供了這兩個特性:經過副本機制提供冗餘和高可靠性,以及經過分散到各個節點的領導者選舉機制來實現負載均衡,因此從這方面來看,RAID的優點就顯得不是那麼明顯了。實際上, 依然有不少公司和組織使用或者打算在RAID之上構建Kafka集羣。
這裏,咱們看看LinkedIn公司是怎麼作的?LinkedIn公司目前的Kafka就搭建於RAID 10之上。他們在Kafka層面設定的副本數是2,所以根據RAID 10的特性,這套集羣實際上提供了4倍的數據冗餘,且只能容忍一臺broker宕機(由於副本數是2)。若LinkedIn公司把副本數提升到3,那麼就提供了6倍的數據冗餘。這將是一筆很大的成本開銷。可是,若是咱們假設LinkedIn公司使用的是JBOD方案。雖然目前JBOD有諸多限制,但其低廉的價格和超高的性價比的確是很是大的優點。另外經過一些簡單的設置,JBOD方案能夠達到和RAID方案同樣的數據冗餘效果。好比說,若是使用JBOD而且設置副本數爲4,那麼Kafka集羣依然提供4倍的數據冗餘,可是這個方案中整個集羣能夠容忍最多3臺broker宕機而不丟失數據。對比以前的RAID方案,JBOD方案沒有犧牲任何高可靠性或是增長硬件成本,同時還提高了整個集羣的高可用性。
事實上,LinkedIn公司目前正在計劃將整個Kafka集羣從RAID 10 遷移到JBOD上。
對於通常的公司或組織而言,選擇JBOD方案的性價比更高。另外推薦用戶爲每一個broker都配置多個日誌路徑,每一個路徑都獨立掛載在不一樣的磁盤上,這使得多塊物理磁盤磁頭同時執行物理I/O寫操做,能夠極大地加速Kafka消息生產的速度。
最後關於磁盤的一個建議就是,儘可能不要使用NAS(Network Attached Storage)這樣的網絡存儲設備。對比本地存儲,人們老是認爲NAS方案速度更快也更可靠,其實否則。NAS一個很大的弊端在於,它們一般都運行在低端的硬件上,這就使得它們的性能不好,可能比一臺筆記本電腦的硬盤強不了多少,表現爲平均延時有很大的不穩定性,而幾乎全部高端的NAS設備廠商都售賣專有的硬件設備,所以成本的開銷也是一個須要考慮的因素。
綜合以上全部的考量,硬盤規劃的結論性總結以下。
Kafka集羣到底須要多大的磁盤容量?這又是一個很是經典的規劃問題。如前所述,Kafka的每條消息都保存在實際的物理磁盤中,這些消息默認會被broker保存一段時間以後清除。這段時間是能夠配置的,所以用戶能夠根據自身實際業務場景和存儲需求來大體計算線上環境所需的磁盤容量。
讓咱們以一個實際的例子來看下應該如何思考這個問題。假設在你的業務場景中,clients天天會產生1億條消息,每條消息保存兩份並保留一週的時間,平均一條消息的大小是1KB,那麼咱們須要爲Kafka規劃多少磁盤空間呢?若是天天1億條消息,那麼天天產生的消息會佔用1億 * 2 * 1KB / 1000 /1000 = 200GB的磁盤空間。咱們最好在額外預留10%的磁盤空間用於其餘數據文件(好比索引文件等)的存儲,所以在這種使用場景下天天新發送的消息將佔用210GB左右的磁盤空間。由於還要保存一週的數據,因此總體的磁盤容量規劃是210 * 7 = 1.5TB。固然,這是無壓縮的狀況。若是在clients啓用了消息壓縮,咱們能夠預估一個平均的壓縮比(好比0.5),那麼總體的磁盤容量就是0.75TB。
總之對於磁盤容量的規劃和如下多少個因素有關。
Kafka對於內存的使用可稱做其設計亮點之一。雖然在前面咱們強調了Kafka大量依靠文件系統和磁盤來保存消息,但其實它還會對消息進行緩存,而這個消息緩存的地方就是內存,具體來講是操做系統的頁緩存(page cache)。
Kafka雖然會持久化每條消息,但其實這個工做都是底層的文件系統來完成的,Kafka僅僅將消息寫入page cache而已,以後將消息「沖刷」到磁盤的任務徹底交由操做系統來完成。另外consumer在讀取消息時也會首先嚐試從該區域中查找,若是直接命中則徹底不用執行耗時的物理I/O操做,從而提高了consumer的總體性能。不管是緩衝已發送消息仍是待讀取消息,操做系統都要先開闢一塊內存區域用於存放接收的Kafka消息,所以這塊內存區域大小的設置對於Kafka的性能就顯得尤其關鍵了。
有些使人驚訝的是,Kafka對於Java堆內存的使用反而不是不少,由於Kafka中的消息一般都屬於「朝生夕滅」的對象實例,能夠很快地垃圾回收(GC)。通常狀況下,broker所需的堆內存都不會超過6GB。因此對於一臺16GB內存的機器而言,文件系統page cache的大小甚至能夠達到10~14GB!
除以上這些考量以外,用戶還須要把page cache大小與實際線上環境中設置的日誌段大小相比較。假設單個日誌段文件大小設置爲10GB,那麼你至少應該給予page cache 10GB以上的內存空間。這樣,待消費的消息有很大機率會保存在頁緩存中,故consumer可以直接命中頁緩存而無須執行緩慢的磁盤I/O讀操做。
總之對內存規劃的建議以下。
比起磁盤和內存,CPU於Kafka而言並無那麼重要——嚴格來講,Kafka不屬於計算密集型(CPU-bound)的系統,所以對於CPU須要記住一點就能夠了:追求多核而非高時鐘頻率。簡單來講,Kafka的機器有16個CPU核這件事情比該機器CPU時鐘高達4GHz更加劇要,由於Kafka可能沒法充分利用這4GHz的頻率,但幾乎確定會用滿16個CPU核。Kafka broker一般會建立幾十個後臺線程,再加上多個垃圾回收線程,多核系統顯然是最佳的配置選擇。
固然,凡事皆有例外。若clients端啓用了消息壓縮,那麼除了要爲clients機器分配足夠的CPU資源外,broker端也有可能須要大量的CPU資源——儘管Kafka 0.10.0.0改進了在broker端的消息處理,免除了解壓縮消息的負擔以節省磁盤佔用和網絡帶寬,但並不是全部狀況下均可以免這種解壓縮(好比clients端和broker端配置的消息版本號不匹配)。若出現這種狀況,用戶就須要爲broker端的機器也配置充裕的CPU資源。
基於以上的判斷依據,咱們對CPU資源規劃的建議以下。
對於Kafka這種在網絡間傳輸大量數據的分佈式數據管道而言,帶寬資源相當重要,而且特別容易成爲系統的瓶頸,所以一個快速且穩定的網絡是Kafka集羣搭建的前提條件。低延時的網絡以及高帶寬有助於實現Kafka集羣的高吞吐量以及用戶請求處理低延時。
當前主流的網絡環境皆使用以太網,帶寬主要也有兩種:1Gb/s和10Gb/s,即平時所說的千兆位網絡和萬兆位網絡。不管是哪一種帶寬,對於大多數的Kafka集羣來講都足矣了。
關於帶寬資源方面的規劃,用戶還須要注意的是儘可能避免使用跨機房的網絡環境,特別是那些跨城市甚至是跨大洲的網絡。由於這些網絡條件下請求的延時將會很是高,無論是broker端仍是clients端都須要額外作特定的配置才能適應。
綜合上述內容,咱們對帶寬資源規劃的建議以下。
下面給出一份典型的線上環境配置,用戶能夠參考這份配置以及結合本身的實際狀況進行二次調整。
接下來,須要討論Kafka集羣涉及的各方面參數,主要包括如下幾種參數。
Kafka broker端提供了不少參數用於調優系統的各個方面,有一些參數是全部Kafka環境都須要考慮和配置的,不管是單機環境仍是分佈式環境。這些參數都是Kafka broker的基礎配置,必定要明確它們的含義。
broker端參數須要在Kafka目錄下的config/server.properties文件中進行設置。當前對於絕大多數的broker端參數而言,Kafka尚不支持動態修改——這就是說,若是要新增、修改、抑或是刪除某些broker參數的話,須要重啓對應的broker服務器。下面就讓咱們來看看主要的參數配置。
除broker端參數以外,Kafka還提供了一些topic級別的參數供用戶使用。所謂的topic級別,是指broker端全局參數。每一個不一樣的topic均可以設置本身的參數值。舉一個例子來講,上面提到的日誌留存時間。顯然,在實際使用中,在全局設置一個通用的留存時間並不方便,由於每一個業務的topic可能有不一樣的留存策略。若是隻能設置全局參數,那麼勢必要取全部業務留存時間的最大值做爲全局參數值,這樣必然會形成空間的浪費。所以Kafka提供了不少topic級別的參數,常見的包括以下幾個。
Kafka broker端代碼雖然是用Scala語言編寫的,但終歸要編譯爲.class文件在JVM上運行。既然是JVM上面的應用,垃圾回收(garbage collection, GC)參數的設置就顯得很是重要。
推薦使用Java8版本,並推薦使用G1垃圾收集器。在沒有任何調優的狀況下,G1收集器自己會比CMS表現出更好的性能,主要體如今Full GC的次數更少、須要微調的參數更少等方面。所以推薦用戶始終使用G1收集器,不管是在broker端仍是在clients端。
除此以外,咱們還須要打開GC日誌的監控,並實時確保不會出現「G1HR #StartFullGC」。至於G1的其餘參數,能夠根據實際使用狀況酌情考慮作微小調整。
以前說過,Kafka推薦用戶使用最新版本的JDK——當前最新的Oracle JDK版本是1.8.0_162。另外鑑於Kafka broker主要使用的堆外內存,即大量使用操做系統的頁緩存,所以其實並不須要爲JVM分配太多的內存。在實際使用中,一般爲broker設置不超過6GB的堆空間。如下就是一份典型的生產環境中的JVM參數列表:
Kafka支持不少平臺,但到目前爲止被普遍使用並已被證實表現良好的平臺,依然是Linux平臺。目前Kafka社區在Windows平臺上已經發現了一些特有的問題,並且在Windows平臺上的工具支持也不像Linux上面那樣豐富,所以推薦應該將生產環境部署在Linux平臺上。
一般狀況下,Kafka並不須要太多的OS級別的參數調優,但依然有一些OS參數是必需要調整的。
本章會從4個方面來考量調優目標:吞吐量、延時、持久性和可用性,如圖9.2所示。
在性能調優時,吞吐量和延時是相互制約的。假設Kafka producer每發送一條消息須要花費2毫秒(即延時是2毫秒),那麼顯然producer的吞吐量就應該是500條/秒,由於1秒能夠發送1 / 0.002 = 500條消息。所以,吞吐量和延時的關係彷佛能夠使用公式來表示:TPS = 1000 / Latency(毫秒)。
其實,二者的關係遠非上面公式表示的這麼簡單。咱們依然以Kafka producer來舉例,假設它仍然以2毫秒的延時來發送消息。若是每次只發送一條消息,那麼TPS天然就是500條/秒。但若是producer不是每次發送一條消息,而是在發送前等待一段時間後統一發送一批消息。假設producer每次發送前會等待8毫秒,而8毫秒以後producer緩存了1000條消息,那此時總延時就累加到10毫秒(2+8),這時TPS等於1000 / 0.01 = 100000條/秒。因而可知,雖然延時增長了4倍,但TPS卻增長了將近200倍。
上面的場景解釋了目前爲何批次化(batching)以及微批次(micro-batching)流行的緣由。實際環境中用戶幾乎老是願意用增長較小延時的代價去換取TPS的顯著提高。值得一提的是,Kafka producer也採起了這樣的理念,這裏的8毫秒就是producer參數linger.ms所表達的含義。producer累積消息通常僅僅是將消息發送到內存中的緩衝區,而發送消息卻須要涉及網絡I/O傳輸。內存操做和I/O操做的時間量級是不一樣的,前者一般是幾百納級,然後者從毫秒到秒級別不等,故producer等待8毫秒積攢出的消息數遠遠多於同等時間內producer可以發送的消息數。
配置合理的操做系統(OS)參數可以顯著提高Kafka集羣的性能、阻止錯誤條件的發生,而OS級錯誤幾乎老是會下降系統性能,甚至影響其餘非功能性需求指標。在Kafka中常常碰到的操做系統級別錯誤可能包括以下幾種。
經過恰當的OS調優咱們就可能提早預防這些錯誤的發生,從而下降問題修復的成本。本節咱們將從如下幾個方面分別探討OS級別的調優。
因爲Kafka大量使用物理磁盤進行消息持久化,故文件系統的選擇是重要的調優步驟。對於Linux系統上的任何文件系統,Kafka都推薦用戶在掛載文件系統(mount)時設置noatime選項,即取消文件atime(最新訪問時間)屬性的更新——禁掉atime更新避免了inode訪問時間的寫入操做,所以極大地減小了文件系統寫操做數,從而提高了集羣性能。Kafka並無使用atime,所以禁掉它是安全的操做。用戶能夠使用mount -o noatime命令進行設置。
值得一提的是,Kafka雖然沒有使用atime,但卻使用了mtime,即修改時間用於日誌切分等操做。固然隨着時間戳屬性在0.10.0.0版本的引入,mtime的使用場景也大大地減小了。
Linux平臺當前有不少文件系統,最多見的當屬EXT4和XFS了。EXT4是EXT系列的最新一版,由EXT3演變提高而來。EXT4已成爲目前大部分Linux發行版的默認文件系統。鑑於EXT4是最標準的文件系統,故目前EXT4的適配性是最好的。絕大多數運行在Linux上的軟件幾乎都是基於EXT4構建和測試的,所以兼容性上EXT4要優於其餘文件系統。
而做爲高性能的64位日誌文件系統(journaling file system),XFS表現出了高性能、高伸縮性,所以特別適用於生產服務器,特別是大文件(30+ GB)操做。不少存儲類的應用都適合選擇XFS做爲底層文件系統。目前RHEL 7.0已然將XFS做爲默認的文件系統。
至於採用哪一種文件系統實際上並無統一的規定。上面兩種文件系統都能很好地與Kafka集羣進行適配。只不過在使用時每種文件系統都有一些特定的配置。
對於使用EXT4的用戶而言,Kafka建議設置如下選項。
對於XFS用戶而言,推薦設置以下參數。
通常Linux發行版會將該值默認設置爲60。不少教程和有經驗的人都建議將該值設置爲0,即徹底禁掉swap以提高內存使用率。
雖然swap開啓時的確會拖慢機器的速度,但若Kafka「吃掉」了全部的物理內存,用戶還能夠經過swap來定位應用並及時處理。假設徹底禁掉了swap,當系統耗盡全部內存(out of memory,OOM)後,Linux的OOM killer將會開啓並根據必定法則選取一個進程殺掉(kill),這個過程對用戶來講是不可見的,所以用戶徹底沒法進行干預(好比在殺掉應用前保存狀態等)。換句話說,一旦用戶關閉了swap,就意味着當OOM出現時有可能丟失一些進程的數據。所以咱們禁掉swap的前提就是要確保你的機器永遠不會出現OOM,這對於生產環境上的Kafka集羣而言,一般都是不能保證的。
建議將swap限定在一個很是小的值,好比1~10之間。這樣既預留了大部分的物理內存,同時也能保證swap機制能夠幫助用戶及時發現並處理OOM。
臨時修改swapiness能夠使用sudo sysctl vm.swappiness=N來完成;若要永久生效,用戶須要修改/etc/sysctl.conf文件增長vm.swappiness=N,而後重啓機器。
推薦用戶使用Java 8。
因爲Kakfa並未大量使用堆上內存(on-the-heap memory)而是使用堆外內存(off-the-heap memory),故不須要爲Kafka設定太大的堆空間。生產環境中6GB一般是足夠了的,要知道以LinkedIn公司1500+臺的Kafka集羣規模來講,其JVM設置中也就是6GB的堆大小。另外,因爲是Java 8,所以推薦使用G1垃圾收集器。下面給出一份典型的調優後JVM配置清單:
對於使用Java 7的用戶,能夠參考下面的清單:
使用Kafka的用戶有時候會碰到「too many files open」的錯誤,這就須要爲broker所在機器調優最大文件部署符上限。調優可參考這樣的公式:broker上可能的最大分區數 * (每一個分區平均數據量 / 平均的日誌段大小 + 3)。這裏的3是索引文件的個數。假設某個broker上將來要放置的最大分區數是20,平均每一個分區總的數據量是100GB(不考慮follower副本),每一個日誌段大小是1GB,那麼這臺broker所在機器的最大文件部署符大小就大概是20 * (100GB / 1GB + 3),即2060。固然考慮到broker還會打開多個底層的Socket資源,實際通常將該值設置得很大,好比100000。
在實際線上Linux環境中,若是單臺broker上topic數過多,用戶可能碰到java.lang.OutOfMemoryError:Map failed的嚴重錯誤。這是由於大量建立topic將極大地消耗操做系統內存,用於內存映射操做。在這種狀況下,用戶須要調整vm.max_map_count參數。具體方法能夠使用命令/sbin/sysctl -w vm.max_map_count=N來設置。該參數默認值是65536,能夠考慮爲線上環境設置更大的值,如262144甚至更大。
另外若是broker所在機器上有多塊物理磁盤,那麼一般推薦配置Kafka所有使用這些磁盤,即設置broker端參數log.dirs指定全部磁盤上的不一樣路徑。這樣Kafka能夠同時讀/寫多塊磁盤上的數據,從而提高系統吞吐量。須要注意的是,當前Kafka根據每一個日誌路徑上分區數而非磁盤容量來作負載均衡,故在實際生產環境中容易出現磁盤A上有大量剩餘空間但Kafka卻將新增的分區日誌放置到磁盤B的情形。用戶須要實時監控各個路徑上的分區數,儘可能保證不要出現過分傾斜。一旦發生上述狀況,用戶能夠執行bin/kafka-reassign-partitions.sh腳本,經過手動的分區遷移把佔用空間多的分區移動到其餘broker上來緩解這種不平衡性。
若要調優TPS,producer、broker和consumer都須要進行調整,以便讓它們在相同的時間內傳輸更多的數據。
Kafka基本的並行單元就是分區。producer在設計時就被要求可以同時向多個分區發送消息,這些消息也要可以被寫入到多個broker中供多個consumer同時消息。所以一般來講,分區數越多TPS越高,然而這並不意味着每次建立topic時要建立大量的分區,這是一個權衡(trade-off)的問題。
過多的分區可能有哪些弊端呢?首先,server/clients端將佔用更多的內存。producer默認使用緩衝區爲每一個分區緩存消息,一旦知足條件producer便會批量發出緩存的消息。看上去這是一個提高性能的設計,不過因爲該參數是分區級別的,所以若是分區不少,這部分緩存的內存佔用也會變大;而在broker端,每一個broker內部都維護了不少分區級別的元數據,好比controller、副本管理器、分區管理器等。顯然,分區數越多,緩存成本越大。
雖然無法給出統一的分區數,但用戶基本上能夠遵循下面的步驟來嘗試肯定分區數。
Kafka提供了專門的腳本kafka-producer-perf-test.sh和kafka-consumer-perf-test.sh用於計算Tp和Tc。值得說明的是,測試producer的TPS一般是很容易的,畢竟邏輯很是簡單,直接發送消息給Kafka broker便可;但測試consumer就與應用關係很大了,特別是與應用處理消息的邏輯有關。測試consumer時儘可能使用真實的消息處理邏輯,這樣測量的結果才能準確地反映線上環境。
在肯定了分區數以後,咱們分別從producer、broker和consumer這3個方面來討論如何調優TPS。
如前所述,producer是批量發送消息的,它會將消息緩存起來後在一個發送請求中統一發送它們。若要優化TPS,那麼最重要的就是調優批量發送的性能參數:批次大小(batch size)和批次發送間隔,即Java版本producer參數batch.size和linger.ms。一般狀況下,增長這兩個參數的值都會提高producer端的TPS。更大的batch size能夠令更多的消息封裝進同一個請求,故發送給broker端的總請求數會減小。此舉既減小了producer的負載,也下降了broker端的CPU請求處理開銷;而更大的linger.ms使producer等待更長的時間才發送消息,這樣就可以緩存更多的消息填滿batch,從而提高了總體的TPS。固然這樣作的弊端在於消息的延時增長了,畢竟消息不是即時發送了。
就像前面說的,分區數的增長總要有個度,當增長到某個數值後因爲鎖競爭和內存佔用過多等因素就會出現TPS的降低。在實際環境中,用戶能夠以2的倍數來逐步增長分區數進行測試,直至出現性能拐點。
除了上面這兩個參數,producer端的另外一個參數compression.type也是調優TPS的重要手段之一。對消息進行壓縮能夠極大地減小網絡傳輸量,下降網絡I/O開銷從而提高TPS。因爲壓縮是針對batch作的,所以batch的效率也直接影響壓縮率。這一般意味着batch中緩存的消息越多,壓縮率越好。當前Kafka支持GZIP、Snappy和LZ4,但因爲目前一些固有配置等緣由,Kafka + LZ4組合的性能是最好的,所以推薦在那些CPU資源充足的環境中啓用producer端壓縮,即設置compression.type=lz4。
當producer發送消息給broker時,消息被髮送到對應分區leader副本所在的broker機器上。默認狀況下,producer會等待leader broker返回發送結果,這時才能知曉這條消息是否發送成功,consumer端也只能消費那些已成功發送的消息。顯然等待leader返回這件事情也會影響producer端TPS:leader broker返回的速度越快,producer就能更快地發送下一條消息,所以TPS也就越高。producer端參數acks控制了這種行爲,默認值等於1表示leader broker把消息寫入底層文件系統即返回,無須等待follower副本的應答。用戶也能夠將acks設置成0,則表示producer端壓根不須要broker端的響應便可開啓下一條消息的發送。這種狀況會提高producer的TPS,固然是以犧牲消息持久化爲代價的。
對於Java版本producer而言,它須要建立必定大小的緩衝區來緩存消息,爲producer端參數buffer.memory的值,該參數默認是32MB,一般來講用戶是不須要調整的,但若在實際應用中發現producer在高負載狀況下常常拋出TimeoutException,則能夠考慮增長此參數的值。增長此值後,producer高負載的狀況(即阻塞)將獲得緩解,從而比以前緩存更多分區的數據,於是總體上提高了TPS。
對於Java版本consumer來講,用戶能夠調整leader副本所在broker每次返回的最小數據量來間接影響TPS——這就是consumer端參數fetch.min.bytes的做用。該參數控制了leader副本每次返回consumer的最小數據字節數。經過增長該參數值,Kafka會爲每一個FETCH請求的response填入更多的數據,從而減小了網絡開銷並提高了TPS。固然它和producer端的batch相似,在提高TPS的同時也會增長consumer的延時,這是由於該參數增長後,broker端必須花額外的時間積累更多的數據才發送response。
另外,若是機器和資源充足,最好使用多個consumer實例共同消費多分區數據。令這些實例共享相同的group id,構成consumer group並行化消費過程,可以顯著地提高consumer端TPS。在實際環境中,筆者推薦用戶最好啓動與待消費分區數相同的實例數,以保證每一個實例都能分配一個具體的分區進行消費。
對於broker,筆者推薦用戶增長參數num.replica.fetches的值。該值控制了broker端follower副本從leader副本處獲取消息的最大線程數。默認值1代表follower副本只使用一個線程去實時拉取leader處的最新消息。對於設置了acks=all的producer而言,主要的延時可能都耽誤在follower與leader同步的過程,故增長該值一般可以縮短同步的時間間隔,從而間接地提高producer端的TPS。
總結一下調優TPS的一些參數清單和要點。
broker端
producer端
consumer端
對於producer而言,延時主要是消息發送的延時,即producer發送PRODUCE請求到broker端返回請求response的時間間隔;對於consumer而言,延時衡量了consumer發送FETCH請求到broker端返回請求response的時間間隔。還有一種延時定義表示的是集羣的端到端延時(end-to-end latency),即producer端發送消息到consumer端「看到」這條消息的時間間隔。
適度地增長分區數會提高TPS,但大量分區的存在對於延時倒是損害。分區數越多,broker就須要越長的時間纔可以實現follower與leader的同步。在同步完成以前,設置acks=all的producer不會認爲該請求已完成,而consumer端更沒法看到這些未提交成功的消息,所以這樣既影響了producer端的延時也增長了consumer端的延時。若要調優延時,咱們必須限制單臺broker上的總分區數,緩解的辦法有3種:①不要建立具備超多分區數的topic;②適度地增長集羣中broker數分散分區數;③和調優TPS相似,增長num.replica.fetchers參數提高broker端的I/O並行度。
和調優TPS相反的是,調優延時要求producer端儘可能不要緩存消息,而是儘快地將消息發送出去。這就意味着最好將linger.ms參數設置成0,不要讓producer花費額外的時間去緩存待發送的消息。
相似地,不要設置壓縮類型。壓縮是用時間換空間的一種優化方式。爲了減小網絡I/O傳輸量,咱們推薦啓用消息壓縮:但爲了下降延時,咱們推薦不要啓用消息壓縮。
producer端的acks參數也是優化延時的重要手段之一。leader broker越快地發送response,producer端就能越快地發送下一批消息。該參數的默認值1實際上已是一個很是不錯的設置,但若是用戶應用對於延時有着比較高的要求,但卻可以容忍偶發的消息發送丟失,則能夠考慮將acks設置成0,在這種狀況下producer壓根不會理會broker端的response,而是持續不斷地發送消息,從而達成最低的延時。
在consumer端,用戶須要調整leader副本返回的最小數據量來間接地影響consumer延時,即fetch.min.bytes參數值。對於延時來講,默認值1已是一個很不錯的選擇,這樣可以使broker儘快地返回數據,不花費額外的時間積累消費數據。
下面總結一下調優延時的一些參數清單。
broker端
producer端
consumer端
持久性一般由冗餘來實現,而Kafka實現冗餘的手段就是備份機制(replication)—— 它保證每條Kafka消息最終會保存在多臺broker上。這樣即便單個broker崩潰,數據依然是可用的。
對於producer而言,高持久性與acks的設置息息相關。acks的設置對於調優TPS和延時都有必定的做用,但acks參數最核心的功能其實是控制producer的持久性。若要達成最高的持久性必須設置acks=all(或acks=-1),即強制leader副本等待ISR中全部副本都響應了某條消息後發送response給producer。ISR副本全都響應消息寫入意味着ISR中全部副本都已將消息寫入底層日誌,這樣只要ISR中還有副本存活,這條消息就不會丟失。
另外一個提高持久性的參數是producer端的retries。在producer發送失敗後,producer視錯誤狀況而有選擇性地自動重試發送消息。生產環境中推薦將該值設置爲一個較大的值。
producer重試,而待發送的消息可能已經發送成功,形成了同一條消息被寫入了兩次,即重複消息。自0.11.0.0版本開始,Kafka提供了冪等性producer,實現了精確一次的處理語義。冪等性producer保證同一條消息只會被broker寫入一次,所以很好地解決了這個問題。啓用冪等性producer的方法也十分簡單,只須要設置producer端參數enable.idempotence=true。
下面總結一下調優持久性的參數清單和要點。
broker端
producer端
consumer端
下面總結一下調優可用性的一些參數清單。
broker端
producer端
consumer端
參考《Apache Kafka實戰》