Kafka使用zookeeper來維護集羣成員的信息。每一個broker都有一個惟一標識符,這個標識符能夠在配置文件指定,也能夠自動生成。算法
在broker停機,出現網絡分區或者長時間垃圾回收停頓時,broker會從zookeeper上斷開鏈接,此時broker在啓動時建立的臨時節點會自動從zookeeper上移除。監聽broker列表的Kafka組件會被告知該broker已移除。數據庫
在徹底關閉一個broker以後,若是使用相同的ID啓動一個全新的broker,它會當即加入集羣,並擁有與舊broker相同的分區和主題。緩存
Kafka使用主題來組織數據,每一個主題被分爲若干個分區,每一個分區有多個副本,副本被保存在broker上,每一個broker均可以保存上千個屬於不一樣主題和分區的副本。安全
副本有兩種類型:服務器
首領副本,每一個分區都有一個首領副本,爲了保證一致性,因此生產者和消費者請求都會通過這個副本。網絡
跟隨者副本,首領之外的副本都是跟隨副本,跟隨副本不處理來自客戶端的請求,它們惟一的任務時從首領那裏複製消息,保持與首領一致的狀態。若是首領發生崩潰,其中一個跟隨者就會被提高爲首領。併發
首領的另外一個任務時搞清楚哪一個跟隨者的狀態與本身時一致的,若是跟隨者10s內沒有請求任何消息,或者10s內沒有請求最新的數據,它們會被認爲是不一樣步的,若是一個副本沒法與首領保持一致,首領失效時他就不能稱爲新的首領,畢竟他沒有包含所有的消息。tcp
除了當前首領外,每一個分區都有一個首選首領,——建立主題時選的首領就是分區的首選首領,之因此叫作首選首領,是由於建立分區時,須要在broker之間均衡首領,所以咱們但願首選首領在稱爲真正的首領時,broker間的負載最終會獲得平衡。默認狀況下Kafka的auto.leader.rebalance.enable=true,它會檢查首選首領是否是當前首領,若是不是,而且該副本時同步的,那就會觸發首領選舉,讓首選首領成爲當前首領。工具
broker的大部分工做時處理客戶端,分區副本和控制器發送給分區首領的請求。Kafka提供了一個二機制協議(基於tcp),指定了請求消息的格式以及broker如何對請求做出響應——包括成功處理請求或在處理請求中遇到錯誤。客戶端發起鏈接併發送請求,broker處理請求並做出響應。broker按照請求到達的順序處理它們——這種順序保證了Kafka具備了消息隊列的特性,同時保證保存的消息時有序的。性能
全部的請求消息包含一個標準消息頭:
1.request type
2.request version
3.correlation ID——一個具備惟一性的數字,用於標識請求消息,同時也會出如今響應消息和錯誤日誌中。
4.client ID——用於標識發送的請求的客戶端
broker會在它監聽的每個端口上運行一個acceptor,這個線程會建立一個鏈接,並把它交給processor線程去處理。processor線程(網絡線程)的數量時可配置的,網絡線程負責從客戶端獲取請求消息,把他們放進請求隊列,而後從響應隊列獲取響應消息,把他們發送給客戶端。
請求消息被放到請求隊列後,IO線程會負責處理它們,下面是幾種最多見的請求類型:
1.生產請求:生產者發送的請求,它包含客戶端要寫入broker的消息
2.獲取請求:在消費者和跟隨者副本須要從broker讀取消息時發送的請求
生產請求和獲取請求都必須發送給分區的首領副本,若是broker收到一個針對特定分區的請求,而該分區的首領在另外一個broker上,那麼發送請求的客戶端會收到一個「非分區首領」的錯誤響應。Kafka客戶端要本身負責把生產請求和獲取請求發送到正確的broker上。
那麼客戶端怎麼知道往哪裏發送請求呢?
客戶端使用了另外一種請求類型,也就是元數據請求,這種請求包含了客戶端感興趣的主題列表。服務器端響應消息裏指明瞭這些主題所包含的分區,每一個分區都有哪些副本,以及哪一個副本是首領。元數據請求能夠發送給任意一個broker,由於全部broker都緩存了這些信息。
通常狀況下,客戶端會把這些信息緩存起來,並直接往目標broker上發送生產和獲取請求。它們須要時不時地發送元數據請求來刷新這些信息(刷新時間間隔經過meta.max.age.ms配置)從而知道元數據是否發生了變動——好比在新的broker加入集羣時,部分副本被移動到新broker上。另外若是客戶端收到非首領錯誤,它會嘗試重發請求以前先刷新元數據,由於這個錯誤說明客戶端正在使用過時的元數據信息。
5.4.1生產請求
Acks配置參數——該參數指定了須要多少個broker確認才能夠任務一個消息寫入時成功的。不一樣的配置對「寫入成功」的界定不一樣;
若是acks=1,那麼只要首領收到消息就認爲寫入成功;
若是acks=all,那麼須要全部的同步副本收到消息纔算寫入成功;
若是acks=0,那麼生產者把消息發送以後,徹底不須要等待broker的響應。
包含首領副本的broker在收到生產請求時,會對請求作一些驗證。
1.發送數據的用戶是否有主題寫入權限?
2.請求裏包含的acks值是否有效(只能出現0,1或all)
3.若是acks=all,是否有足夠多同步副本保證消息已經被安全寫入?(若是同步副本不足,broker能夠拒絕處理新消息。)
以後消息被寫入本地磁盤。在Linux系統上,消息會被寫到文件系統緩存裏,並不保證什麼時候會被刷新到磁盤上,Kafka不會一直等待數據被寫到磁盤上——它依賴複製功能來保證消息的持久性。
在消息被寫入分區首領以後,broker開始檢查acks配置參數——若是acks被設爲0或1,那麼broker當即返回響應;若是acks被設爲all,那麼請求會被保存在一個叫作煉獄的緩衝區,直到首領發現全部跟隨者副本都複製了消息,響應纔會返回給客戶端。
5.4.2獲取請求
broker處理獲取請求的方式與處理生產請求類似。客戶端發送請求,想broker請求主題分區具備特定偏移量的消息。客戶端能夠指定broker最多從一個分區返回多少數據。這個限定很是重要,由於客戶端須要爲broker返回的數據分配足夠的內存。若是沒有這個限制,broker返回的大量數據可能耗盡客戶端內存。若是請求的偏移量存在,broker將按照客戶端指定的數量上限從分區讀取消息,再把消息返回客戶端。Kafka使用零複製技術想客戶端發送消息——也就是說Kafka直接把消息發送到網絡通道,不須要通過任何中間緩衝區。這種技術避免了字節複製,也不須要管理內存緩存區,從而得到更好性能。
客戶端除了能夠設置返回數據的上限,也能夠設置下限。在主題消息流量不是很大的狀況下,這樣能夠減小CPU和網絡開銷。客戶端發送一個請求,broker有足夠數據才把它們返回客戶端。固然客戶端能夠設定一個超時時間,當到達時間後,即使broker沒有足夠數據,也會發送到消費者。
並不時保存在分區首領上的數據都能被客戶端讀取,大部分客戶端只能讀取已經被寫入全部同步副本的消息,分區首領知道每一個消息會被複制到哪一個副本上,在消息被寫入全部同步副本以前,是不會發給消費者的——嘗試獲取這些消息的請求會獲得空的響應而不是錯誤。
這意味着若是broker間的消息複製變慢,那麼消息到達消費者的時間也會變長,延遲時間能夠經過replica.lag.time.max.ms來配置,它指定來副本在複製消息時可被容許的最大延遲時間。
以前的Kafka消費者使用zookeeper跟蹤偏移量,以後決定把偏移量保存在特定的Kafka主題上。爲了達到這個目的,咱們不得不往協議裏增長几種請求類型在:offsetCommitRequest,offsetFetchRequest和listOffsetRequest,如今應用程序調用commitOffset方法時客戶端再也不把偏移量寫入zookeeper,而是向Kafka發送offsetCommitRequest請求。
主題的建立仍然須要經過命令行來完成,命令行工具會直接更新zookeeper裏的主題列表,broker監聽這些主題列表,在有新主題加入時,它們會收到通知。咱們正在改進Kafka,增長createTopicRequest,這樣客戶端就可直接向broker請求建立新主題了。
咱們在0.10.0版本增長了APIVersionRequest,客戶端能夠循環broker支持哪些版本的請求,而後使用正確的版本與broker通訊。
5.5物理存儲
Kafka的基本存儲單元時分區,分區沒法在多個broker間在細分。
在建立主題時,Kafka首選會決定如何在broker間分配分區,咱們要達到以下目標:
1.在broker間平均分配分區副本,
2.確保每一個分區的每一個副本分佈在不一樣的broker上
3.若是broker指定了機架信息,那麼儘量把每一個分區分配到不一樣機架的broker上。
爲分區和副本選好合適的broker以後,接下來要以爲這些分區應該使用哪一個目錄。咱們單獨爲每一個分區分配目錄,規則:計算每一個目錄的分區數量,新的分區老是被添加到數量最小的目錄裏。
5.5.2文件管理
保留數據時Kafka的一個基本特性,Kafka不會一直保留數據,也不會等到全部消費者都讀取了消息以後才能刪除消息。相反,Kafka管理員爲每一個主題配置了數據保留期限,規定了數據被刪除以前能夠保留多長時間,或者清理數據以前能夠保留的數據量大小。
由於在一個大文件裏查找和刪除消息時很費時的,也容易出錯,因此把分區分紅若干個片斷,默認狀況下,每一個片斷包含1G或一週的數據,比較小的爲準,在broker往分區寫入數據時,若是達到片斷上限,就關閉當前文件,並打開一個新文件。
當前正在寫入數據的片斷叫作活躍片斷,活躍片斷永遠不會被刪除。
5.5.3文件格式
咱們把Kafka消息和偏移量保存在文件裏,保存在磁盤上的數據格式與從生產者發送過來貨發送給消費者的消息格式時同樣的。由於使用了相同的消息格式進行磁盤存儲和網絡傳輸,Kafka可使用零複製技術給消費者發送消息,同時避免了對生產者壓縮過的消息進行解壓和再壓縮。
除了鍵值和偏移量外,消息還包含了消息大小,校驗和,消息格式版本號,壓縮算法和時間戳。
時間戳能夠時生產者發送消息的時間,也能夠是消息到達broker的時間,這個能夠配置。
Kafka附帶了一個叫作DumpLogSegment的工具,能夠查看片斷內容,顯示每一個消息的偏移量,校驗和,魔術數字節,消息大小和壓縮算法。
5.5.4索引
爲了幫助broker更快定位到指定偏移量,Kafka爲每一個分區維護了一個索引。索引把偏移量映射到片斷文件和偏移量在文件的位置。
5.5.5清理
Kafka經過改變主題的保留策略來知足這些場景(只關心最新的數據),早於保留時間的舊事件會被刪除,爲每一個鍵保留最新值,只有當應用程序的事件裏包含鍵值對時,爲這些主題設置compact策略纔有意義。
若是Kafka啓動時啓動了清理功能(經過設置log.cleaner.enabled)每一個broker會啓動一個清理管理器線程和多個清理線程,它們負責執行清理任務,這些線程會選擇污濁率較高的分區進行清理。
5.5.7被刪除的事件
若是爲了把一個鍵從系統刪除,應用程序必須發送一個包含該鍵且值爲null的消息。清理線程發現該消息時,會先進行常規清理,只保留值爲null的消息,若是消費者往數據庫複製Kafka數據,當看到這個消息時,就知道要把相關信息從數據庫裏刪除。過一段時間後,清理線程就會移除這個消息。鍵也會從Kafka消失。
5.5.8什麼時候會清理主題
就像delete策略不刪除當前活躍片斷同樣,compact策略也不會對當前片斷進行清理,只有舊片斷的消息纔會被清理。Kafka會在包含髒記錄的主題達到50%時進行清理。