《Apache Kafka 實戰》讀書筆記-認識Apache Kafkahtml
做者:尹正傑正則表達式
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。windows
一.kafka概要設計
kafka在設計初衷就是爲了解決互聯網公司的超級大量級數據的實時傳輸。爲了實現這個目標,kafka在設計之初就須要考慮如下四個方面:
第一:吞吐量/延遲
第二:消息持久化
第三:負載均衡和故障轉移
第四:伸縮性
1>.吞吐量/延時介紹
咱們先打個比方:若kafka處理一條消息須要花費2ms,那麼計算獲得的吞吐量不會超過500條消息每秒(1000ms/2ms=500條/s)。可是若咱們採用批處理(batching)的思想,假設在發送前咱們首先會等待一段時間(假設是8ms),那麼此時消息發送的延遲變成了10ms(2ms+8ms),即延遲增長了4倍,但假設在這8ms中咱們總共積累了1000條消息,那麼整個系統的吞吐量就變成了100000 條/s。此時你會發現吞吐量提高了200倍!看到micor-batch的威力了吧?這就是目前諸如Storm Trident 和 Spark Streaming等消息處理平臺所採用的批處理思想。
2>.Kafka如何作到高吞吐量,低延遲的呢?
首先,kafka的寫入操做是很快的,這主要得益於它對磁盤的使用方法的不一樣。雖然kafka會持久化全部數據到磁盤,但本質上每次寫入操做其實都只是把數據寫入到操做系統的頁緩存(page cache)中,而後由操做系統自行決定何時把頁緩存中的數據寫入磁盤上。這樣的設計由三個主要的優點:
第一:操做系統頁緩存是內存中分配的,因此消息寫入的速度很是快;
第二:kafka沒必要直接與底層的文件系統打交道。因此煩瑣的I/O操做都交由操做系統來處理;
第三:kafka寫入操做採用追加寫入(append)方式,避免了磁盤隨機寫操做(據資料統計,順序磁盤I/O速度是絕不遜色於隨機讀寫內存I/O速度。感興趣的小夥伴可使用相關工具測試一下。);
3>.Kafka的高吞吐量,低延遲的設計目標
第一:大量使用操做系統頁緩存,內存操做速度快且命中率高;
第二:Kafka不直接參與物理I/O操做,而是交由最擅長此時的操做系統來完成;
第三:採用追加寫入方式,摒棄了緩慢的磁盤隨機讀/寫操做;
第四:使用sendfile爲表明的零拷貝技術增強網絡間的數據傳輸效率;
4>.消息持久化的優勢
第一:解耦消息發送和消息消費
本質上來講,kakfa最核心的功能就是提供了生產者-消費者模式的完整解決方案。經過將消息持久化使得生產者方再也不須要直接和消費者方耦合,它只是簡單的把消息生產出來並交由kafka服務器保存便可,所以提高了總體的吞吐量。
第二:實現靈活的消息處理
不少kafka的下游子系統(接受kafka消息的系統)都有這樣的需求:對於已經處理過的消息可能在將來的某個時間點從新處理一次,即所謂的消息消息重演(message replay)。消息持久化即可以很方便地實現這樣的需求。
第三:負載均衡和故障轉移
做爲一個功能完備的分佈式系統,kafka若是隻提供了最基本的消息引擎功能確定不足以幫助它脫穎而出。一套完整的消息引擎解決方案中韓必然要提供負載均衡(load balancing)和故障轉移(fail-over)功能。
何爲負載均衡?顧名思義就是讓系統的負載根據必定的規則均衡地分配在全部參數工做的服務器上,從而最大限度的提高總體的運行效率。kafka實現負載均衡其實是經過智能化的分區領導者選舉(partition leader election)來實現的。
除了負載均衡,完備的分佈式系統還支持故障轉移,所謂故障轉移,是指當服務器意外終止時,整個集羣能夠快速的檢測到該失效(failure),並當即將該服務器上應用或服務自動轉移到其餘服務器上。故障轉移一般是「心跳」和「會話「的機制來實現的。kafka服務器支持故障轉移的方式就是使用繪畫機制。每臺kafka服務器啓動後會以會話的形式把本身註冊到zookeeper服務器上。一旦該服務運轉出現問題,與zookeeper的會話變不能維持從而超時失效,此時kafka集羣會選舉出另一臺服務器來徹底代替這臺服務器繼續提供服務。
第四:伸縮性
所謂伸縮性,英文名是scalability。伸縮性表示想分佈式系統中增長額外的計算資源(好比CPU,內存,存儲或帶寬)時吞吐量提高的能力。阻礙線性擴容的一個很常見的因素就是狀態的保存。咱們知道,不管是哪類分佈式系統,集羣的每臺服務器必定會維護不少內部狀態。若是由服務器本身來保存這些狀態信息,則必須處理一致性的問題。相反,若是服務器是無狀態的,狀態的保存和管理交與專門的協調服務來作(好比zookeeper)。那麼整個集羣的服務武器之間就無需繁重的狀態共享,者極大的下降了維護複雜度。假若要擴容集羣節點,只須要簡單的啓動新的節點集羣和進行自動負載均衡就能夠了。
Kafka正式採用了這樣的思想:每臺kafka服務器上的狀態統一交友zookeeper保管。擴展kafka集羣也只須要一步:啓動新的kafka服務器便可。固然這裏須要言明的是,在kafka服務器上並非全部的狀態信息都不保存,它只保存了很輕量級的內部狀態(好比從kakka 0.10.x版本以後,它將每一個topic的消費者的偏移量本身維護了,把這些偏移量存放到了一個叫作「__consumer_offsets」的的topic進行維護)。
二.Kafka基本概念與術語
1>.Kafka的消息格式
既然kafka的核心功能就是消息引擎,那麼對於消息的設計天然是首當其衝的時期。kafka沒有使人失望,其對消息格式的設計與保存的確有不少創新之處。首先,kakfa中的消息有不少字段組成,其中有的不少字段都是用於管理消息的原數據字段,對用戶來講是徹底透明的。kakfa的消息格式經歷過3次變遷(咱們這次暫不考慮新出的kafka 2.0.1版本,由於我並無對這個版本作深刻的調研。)他們分別稱爲V0,V1和V2版本。目前大部分用戶使用的應該仍是V1版本的消息格式。V1版本的消息格式以下圖所示:
如上圖所示(上圖摘自互聯網),消息由消息頭部,key和value組成。消息頭部包括消息的CRC碼,消息版本號,屬性,時間戳,鍵長度,和消息長度等信息。其實對於普通用戶來講,掌握一下3個字段的含義通常就夠用了:
key :
消息鍵,對消息作partition時使用,即決定消息被保存在某topic下的哪一個partition。
value:
消息體,保存實際的消息數據。
timestamp:
消息發送時間戳, 用於流式處理及其餘依賴時間的處理語義。如寶不指定,則取當前時間。
另外這裏單獨提一下消息的屬性字段,kafka爲該字段分配了一個字節,目前只使用了最低的3我爲用於保存消息的壓縮類型,其他5爲還沒有使用。當前支持4中壓縮類型:0(無壓縮),1(GZIP),2(Snappy)和3(LZ4)。關於kafka消息格式V0和V2版本,你們自行百度,推薦一片不錯的文章:更多資料請參考:https://www.cnblogs.com/qwangxiao/p/9043491.html。
其次,kafka使用緊湊的二進制字節數組來保存消息格式的字段,也就是說沒有任何多餘的比特位浪費。kafka在消息設計時特地避開了繁重的Java堆內存分配,直接使用緊湊二進制字節數組ByteBuffer而不是獨立的對象,所以咱們至少可以訪問多一倍的可用內存。按照Kafka官網的說法,在一臺32GB內存的機器上,Kafka幾乎可以用到28~30GB的物理內存,同時還沒必要擔憂GC的糟糕性能。若是使用ByteBuffer來保存一樣的消息,只須要24字節,比起純Java堆的實現減小了40%的空間佔用,好處不言而喻。這種設計的好處還包括加入了擴展的可能性。
同時,大量使用也緩存而非對內存還有一個好處:當出現Kafka broker進程崩潰時,堆內存的數據也一併小時,但頁緩存的數據易燃存在。下載Kafka broker重啓後能夠繼續提供服務,不須要再單獨「熱」緩存了。
2>.topic和partition
在概念上來講,topic只是一個邏輯概念,表明了一類消息,也能夠認爲是消息被髮送到的地方。一般咱們可使用topic來區分實際業務,好比業務A使用一個topic,業務B使用另外一個topic。從本質上說,每一個Kafka topic都由若干個partition組成,而Kafka的partition是不可修改的有序消息序列,也就是說是有序的消息日誌。每一個partition有本身專屬的partition號,一般是從0開始的。用戶堆partition我惟一能作的操做就是在消息序列的尾部追加寫入消息。
partition上的每條消息都會被分配一個惟一的序列號,按照Kafka的術語來說,該序列號被稱爲位移(offset)。該位移值是從0開始順序遞增的證書。位移信息能夠惟必定義到某partition下的一條消息。值得一提的是,Kafka的partition實際上並無太多的業務含義,它的引入就是單純的爲了提高系統的吞吐量,所以在建立Kafka topic的時候能夠根據集羣實際配置設置具體的partition數,實現總體性能的最大化。
3>.offset
上面說過,topic partition下的每條消息都被分配了一個位移值。實際上,Kafka消費者端也有位移(offset)的概念,但必定要注意這兩個offset屬於不一樣的概念。
顯然,每條消息在某個partition的位移是固定的,但消費該partition的消費者的位移是會隨着消費進度不斷遷移,但終究不可能超過該分區最新一條消息的位移。綜合以前說的topic,partition和offset,咱們能夠斷言Kafka中的一條消息其實就是一個<topic,partition,offset>三元組(tuple),經過該元組值咱們能夠在Kafka集羣中找到位移對應的那條消息。
4>.replia
既然咱們已知partition是有序的消息日誌,那麼必定不能只保存者一份日誌,不然一旦保存在partition的Kafka服務器掛掉了,其上保存的消息也就都丟失了。分佈式系統必然要實現高可靠性,而目前實現的主要途徑仍是依靠冗餘機制。換句話說,就是備份多份日誌。這些分貝日誌在Kafka中被稱爲副本(replica),它們存在的惟一目的就是防止數據丟失,這一點必定要記住!
5>.leader和follower
副本(replia)分爲兩類:領導者副本(leader replia)和追隨者副本(follower replia)。follower replica是不能提供服務給客戶端的,也就是說不負責響應客戶端發來的消息寫入和消息消費請求。它只是被動地向領導者副本(leader replia)獲取數據,而一旦leader replica 所在的broker宕機,Kafka會從剩餘的replica中選舉出新的leader繼續提供服務。
Kafka保證同一個partition的多個replica必定不會分配在同一臺broker上。畢竟若是同一個broker上有同一個partition的多個replica,那麼將沒法實現備份冗餘的效果。
6>.ISR
ISR的全稱是in-sync replica,翻譯過來就是與leader replica保持同步的replica集合。這是一個特別重要的概念。前面講了不少關於Kafka的副本機制,好比一個partition能夠配置N個replica,那麼這是否就意味着該partition能夠容忍N-1個replica實現而不丟失數據呢?答案是:「否」!
副本數對Kafka的吞吐率是有必定的影響,但極大的加強了可用性。默認狀況下Kafka的replica數量爲1,即每一個partition都有一個惟一的leader,爲了確保消息的可靠性,一般應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置爲大於1,好比3。 全部的副本(replicas)統稱爲Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。相反的,當這些replicas從新「追上」了leader的進度時,那麼Kafka會將他們加回到ISR中。這一切都是自動維護的,不須要用戶進行人爲干預,於是在保證了消息交付語義的同時,還簡化了用戶的操做成本。後端
更多學習筆記請參考:https://www.cnblogs.com/yinzhengjie/p/9652392.html。數組
三.Kafka的使用場景
Kafka以消息引擎聞名,所以它特別適合處理生產環境中的那些流式數據。如下就是Kafka在實際應用中一些典型的使用場景。
1>.消息傳輸
Kafka很是適合替代傳統的消息總線(message bus)或消息代理(message broker)。傳統的這類系統擅長於解耦生產者和消費者以及批量處理消息,而這些特色Kafka都具有。除此以外,Kafka還具備更好的吞吐量特性,其內置的分區機制和副本機制既實現了高性能的消息傳輸,同時還達到了高性能的高容錯性。一次Kafka特別適合用於實現一個超大量級消息處理應用。
2>.網站行爲日誌追蹤
Kafka最先就是用於重建用戶行爲數據追蹤系統的。不少網站上的用戶操做都會以消息的形式發送到Kafka的某個對應的topic上。這些點擊流蘊含了巨大的商業價值,事實上,目前就有不少創業公司使用機器學習或其餘實時處理框架來幫助收集並分析用戶的點擊流數據。鑑於這種點擊流數據量是很大的,Kafka超強的吞吐量特性此時就有了用武之地。
3>.審計數據收集
不少企業和組織都須要對關鍵的操做和運維進行監控和審計。這就須要從各個方面運維應用程序處實時彙總操做步驟信息進行集中式管理。在這種使用場景下,你會發現Kafka是很是適合的解決方案,它能夠便捷的對多路消息進行實時收集,同時因爲其持久化的特性,是的後續離線審計稱爲可能。
4>.日誌收集
這多是Kafka最多見的使用方式了(日誌收集彙總解決方案),每一個企業都會產生大量的服務日誌,這些日誌分散在不一樣的機器上。咱們可使用Kafka對他們進行全量收集,並集中往下游的分佈式存儲中(好比HDFS等)。比起其餘主流的日誌抽取框架(好比Apache Flume),Kafka有更好的性能,並且提供了完備的可靠性解決方案,同時還保持 了低延遲的特色。
5>.Event Sourcing
Event Sourcing其實是領域驅動設計(Domain-Driven Design,簡稱DDD)的名次,它使用事件序列來表示狀態變動,這種思想和Kafka的設計特性不謀而合。還記得吧,Kafka也是用不可變動的消息序列來抽象化表示業務信息的,所以Kafka特別適合做爲這種應用的後端存儲。
6>.流式處理
不少用戶接觸到Kafka都是由於它的消息存儲引擎。自0.10.0.0版本開始,Kafka社區推出了一個全新的流式組件 Kafka Streams。這標誌着Kafka正式進入流式處理框架俱樂部。相比老牌流式處理框架Apache Storm,Apache Samza,或是最近風頭正勁的Spark Streaming,抑或是Apache Flink,Kafka Streams的競爭力如何?讓咱們拭目以待吧!
四.集羣環境規劃
1>.操做系統的選型
咱們知道Kafka依賴於Java環境,所以咱們只要能在操做系統上安裝jdk理論上就能夠部署kafka環境了。沒錯,事實上kafka的確能夠運行在主流的操做系統上,好比windows,Linux,mac OS等等。可是這麼多操做系統咱們究竟應該選擇哪一個操做系統去安裝呢?爲何你們部署kafka集羣都選擇的是Linux環境呢?其實我們是能夠分析緣由的:
第一:Kafka新版本clients在設計底層網絡庫時採用了Java的Selecor機制(NIO),然後者在Linux實現機制就是epoll;可是在window平臺上,Java NIO的Selector底層是使用select模型而非IOCP實現的,只有Java NIO2擦拭使用IOCP實現的。所以這一點上,在Linux部署Kafka要在比Windows上部署可以獲得高效的I/O處理能力;
第二:對於數據網絡傳輸效率而言,Linux也更具備優點。具體來講,Kafka這種應用必然須要大量的經過網絡於磁盤進行數據傳輸,而大部分這樣的操做都是經過Java的FileChannel.transferTo方法實現的,在Linux平臺上該方法底層會調用sendfile系統調用,即採用了Linux提供的零拷貝(Zero Copy)技術。
2>.磁盤規劃
事實上,根據公開的資料顯示,LinkedIn公司的Kafka集羣就是使用RAID 10做爲底層存儲的。除了默認提供的數據冗餘以外,RAID 10 還能夠將數據自動的負載分佈到多個磁盤上。因而可知,RAID做爲Kafka的底層存儲其實主要的優點有兩個:
第一:提供冗餘的數據存儲空間;
第二:自然提供負載均衡;
以上兩個優點對於任何系統而言都是很好的特性。不過對於Kafka而言,Kafka在框架層面其實已經提供了這兩個特性:經過副本機制提供冗餘和高可靠性,以及經過分散到各個節點的領導者選舉機制來實現負載均衡,因此從這方面來看,RAID的優點就顯得不是那麼明顯了。事實上,LinkedIn公司目前正在計劃將整個Kafka集羣從RAID 10 遷移到JBOD上,只不過在整個過程當中JBOD方案須要解決當前的Kafka一些固有缺陷,好比:
第一:任意磁盤損壞都會致使broker宕機,普通磁盤損壞的機率是很大的,所以這個缺陷從某種程度上來講是致命的。不過社區正在改進這個問題,將來版本中只要爲broker配置的多塊磁盤中還有良好的磁盤,broker就不會掛掉。
第二:JBOD的管理須要更加細粒度化,目前Kafka沒有提供腳本或其餘工具用於在不一樣磁盤間進行手動分配,但這是使用JBOD方案中必要的功能。
第三:JBOD也應該提供相似於負載均衡的功能,目前只是間的依賴輪訓的方式爲副本數據選擇磁盤,後續須要提供更加豐富的策略。
結合JBOD和RAID之間的優劣對比以及LinkIn公司的實際案例,我們能夠給硬盤規劃的結論性總結以下:
第一:追求性價比的公司能夠考慮使用JBOD;
第二:使用機械硬盤徹底能夠知足Kafka集羣的使用,SSD更好(儘可能不要使用NAS(Network Attached Storage)這樣的網絡存儲設備。);
3>. 磁盤容量規劃
對於磁盤容量的規劃和如下結果因素有關:
第一:新增消息數;
第二:消息留存時間;
第四:平均消息大小;
第五:副本數;
第六:是否啓用壓縮;
4>.內存規劃
Kafka對於內存對使用可稱做其設計亮點之一。雖然在前面咱們強調了Kafka大量依靠和磁盤來保存消息,但其實它還會對消息進行緩存,而這個消息換粗你得地方就是內存,具體來講是操做系統對頁緩存(page cache)。Kafka雖然會持久化每條消息,但其實這個工做都是底層對文件系統來完成。Kafka僅僅將消息寫入page cache而已,以後將消息「flush」到磁盤對任務徹底交由操做系統來完成。
通常狀況下,broker所需的堆內存都不會超過6GB。因此對於一臺16GB內存的機器而言,文件系統page cache的大小甚至能夠達到10~14GB!總之對於內存規劃的建議以下:
第一:儘可能分配更多的內存給操做系統的page cache;
第二:不要爲broker設置過大的堆內存,最好不超過6GB;
第三:page大小至少要大於一個日誌段的大小;
5>.CPU規劃
比起磁盤和內存,CPU於kafka而言並無那麼重要,嚴格來講,kafka不屬於計算密集型(CPU-bound)的系統,所以杜宇CPU須要記住一點就能夠了:追求多核而非高時鐘頻率。我們對CPU資源規劃以下:
第一:使用多核系統,CPU核數最好大於8;
第二:若是使用Kafka 0.10.0.0以前的版本或clients端消息版本不一致(若無顯式配置,這種狀況多半由clients和broker版本不一致形成),則考慮多配置一些資源以防止消息解壓操做消耗過多CPU)。
6>.帶寬規劃
第一:儘可能使用高速網絡;
第二:根據自身網絡條件和帶寬來評估Kafka集羣機器數量;
第三:避免使用跨機房網絡;
7>.典型線上環境配置
下面給出一份典型的線上環境配置,用戶能夠參考這份配置以及結合本身的是實際狀況進行二次調整:
CPU 24核心;
內存 32GB;
磁盤 1TB 7200轉SAS盤2快;
帶寬:1Gb/s;
ulimit -n 1000000;
Socket Buffer 至少64KB,適合於跨機房網絡傳輸;
五.reblance掃盲
1>.rebalance簡介緩存
consumer group的rebalance本質上是一組協議,它規定了一個consumer group 是如何達成一致來分配訂閱topic的全部分區的。假設某個組下有20個consumer實例,該組訂閱一個有着100個分區的topic。正常狀況下,Kafka會爲每一個consumer平均分配5個分區。這個分配過程就被稱爲rebalance。服務器
當consumer成功執行rebalance後,組訂閱topic的每一個分區只會分配給組內一個consumer實例。換句話說,同一個消費者組的消費者不能同時對同一個topic的同一個分區進行消費。網絡
和舊版本consumer依託於zookeeper進行rebalance不一樣,新版本consumer使用了Kafka內置的一個全新的協議(group coordination protocol)。對於每一個組而言,Kafka的某個broker會被選舉爲組協調者(group coordinator)。coordinator負責對組對狀態進行管理,他的主要責任就是當新成員到達時促成組內全部成員達成新對分區分配方案,即coordinator負責對組執行rebalance操做。app
2>.rebalance觸發條件負載均衡
組rebalance觸發對條件有如下3個:
第一:組成員發生變動,好比新consumer加入組,或已有consumer主動離開組,再或是已有consumer崩潰時則觸發rebalance;
第二:組訂閱topic數發生變動,好比使用基於正則表達式對訂閱,當匹配正則表達式對新topic被建立時則會觸發rebalance;
第三:組訂閱topic時分區發生變動,好比使用命令行腳本增長了訂閱topic的分區數;
真實應用場景引起rebalance最多見的緣由就是違背了第一個條件(好比flume的kafka source相對於broker集羣來講就是consumer對象),特別是consumer崩潰的狀況。這裏的崩潰不必定就是指consumer進程「掛掉」或consumer進程所在的機器宕機。當consumer沒法在指定的時間內完成消息處理,那麼coordinator就認爲該consumer已經崩潰,從而引起新一輪rebalance。
我在生產環境中也使用flume消費kafka的數據到hdfs集羣上,也遇到過這種rebalance的狀況,最終分析緣由是:該group下的consumer處理消息的邏輯太重,並且事件處理時間波動很大,很是不穩定,從而致使coordinator會常常行的認爲某個consumer已經掛掉,引起rebalance。鑑於目前一次rebalance操做的開銷很大,生產環境中用戶必定要結合自身業務特色仔細調優consumer參數:「request.timeout.ms」,「max.poll.records」和「max.poll.interval.ms」這幾個參數,以免沒必要要的rebalance出現。
3>.rebalance協議
前面咱們提到過rebalance本質上是一組協議。group於coordinator共同使用這組協議完成group的rebalance。最新版本的Kafka中提供了下面5個協議來處理rebalance相關事宜。
第一:JoinGroup請求:consumer請求加入組;
第二:SyncGroup請求:group leader 把分配方案同步更新到組內全部成員中;
第三:Heartbeat請求:consumer按期向coordinator彙報心跳代表本身依然存活;
第四:LeaveGroup請求:consumer主動通知coordinator該consumer即將離組;
第五:DescribeGroup請求:查看組的全部信息,包括成員信息,協議信息,分配方案以及訂閱信息等。該請求類型主要供管理員使用。coordinator不使用該請求執行rebalance。
在rebalance過程當中,coordinator主要處理consumer發過來的joinGroup和SyncGroup請求。當consumer主動離組時會發送LeaveGroup請求給coordinator。
在成功rebalance過程當中,組內全部consumer都須要按期向coordinator發送Hearbeat請求。而每一個consumer也是根據Heartbeat請求的響應中是否包含REBALANCE_IN_PROGRESS來判斷當前group是否開啓來新一輪rebalance。
好啦~關於rebalance我們瞭解到這裏基本上就夠用來,感興趣的小夥伴能夠查看rebalance genneration,rebalance流程,rebalance監聽器等技術,咱們這裏就不用深刻探討啦~