kafka原理解析

介紹

分佈式消息系統kafka的提供了一個生產者、緩衝區、消費者的模型html

  • broker:中間的kafka cluster,存儲消息,是由多個server組成的集羣
  • topic:kafka給消息提供的分類方式。broker用來存儲不一樣topic的消息數據
  • producer:往broker中某個topic裏面生產數據
  • consumer:往broker中某個topic獲取數據

設計思想

topic與消息

kafka將全部消息組織成多個topic的形式存儲,而每一個topic又能夠拆分紅多個partition,每一個partition又由一個一個消息組成。每一個消息都被標識了一個遞增序列號表明其進來的前後順序,並按順序存儲在partition中。java

這樣,消息就以一個個id的方式,組織起來。node

  • producer選擇一個topic,生產消息,消息會經過分配策略append到某個partition末尾
  • consumer選擇一個topic,經過id指定從哪一個位置開始消費消息。消費完成以後保留id,下次能夠從這個位置開始繼續消費,也能夠從其餘任意位置開始消費

這個id,在kafka中被稱爲offsetreact

這種組織和處理策略提供了以下好處:git

  • 消費者能夠根據需求,靈活指定offset消費
  • 保證了消息不變性,爲併發消費提供了線程安全的保證。每一個consumer都保留本身的offset,互相之間不干擾,不存在線程安全問題
  • 消息訪問的並行高效性。每一個topic中的消息被組織成多個partition,partition均勻分配到集羣server中。生產、消費消息的時候,會被路由到指定partition,減小競爭,增長了程序的並行能力
  • 增長消息系統的可伸縮性。每一個topic中保留的消息可能很是龐大,經過partition將消息切分紅多個子消息,並經過負責均衡策略將partition分配到不一樣server。這樣當機器負載滿的時候,經過擴容能夠將消息從新均勻分配
  • 保證消息可靠性。消息消費完成以後不會刪除,能夠經過重置offset從新消費,保證了消息不會丟失
  • 靈活的持久化策略。能夠經過指定時間段(如最近一天)來保存消息,節省broker存儲空間

備份

消息以partition爲單位分配到多個server,並以partition爲單位進行備份。備份策略爲:1個leader和N個followers,leader接受讀寫請求,followers被動複製leader。leader和followers會在集羣中打散,保證partition高可用github

producer

producer生產消息須要以下參數:算法

  • topic:往哪一個topic生產消息
  • partition:往哪一個partition生產消息
  • key:根據該key將消息分區到不一樣partition
  • message:消息

根據kafka源碼,能夠根據不一樣參數靈活調整生產、分區策略apache

if topic is None
    throw Error

p=None

if partition Not None
    if partition < 0 Or partition >= numPartitions
        throw Error
    p=partition
elif key Not None
    p=hash(key) % numPartitions
else
    p=round-robin() % numPartitions

send message to the partition p

上面是我翻譯的僞代碼,其中round-robin就是簡單輪詢,hash採用的是murmurhashapi

consumer

傳統消息系統有兩種模式:緩存

  • 隊列
  • 發佈訂閱

kafka經過consumer group將兩種模式統一處理

每一個consumer將本身標記consumer group名稱,以後系統會將consumer group按名稱分組,將消息複製並分發給全部分組,每一個分組只有一個consumer能消費這條消息。

因而推理出兩個極端狀況:

  • 當全部consumer的consumer group相同時,系統變成隊列模式
  • 當每一個consumer的consumer group都不相同時,系統變成發佈訂閱

多consumer併發消費消息時,容易致使消息亂序

經過限制消費者爲同步,能夠保證消息有序,可是這大大下降了程序的併發性。

kafka經過partition的概念,保證了partition內消息有序性,緩解了上面的問題。partition內消息會複製分發給全部分組,每一個分組只有一個consumer能消費這條消息。這個語義保證了某個分組消費某個分區的消息,是同步而非併發的。若是一個topic只有一個partition,那麼這個topic併發消費有序,不然只是單個partition有序。

通常消息消息系統,consumer存在兩種消費模型:

  • push:優點在於消息實時性高。劣勢在於沒有考慮consumer消費能力和飽和狀況,容易致使producer壓垮consumer
  • pull:優點在能夠控制消費速度和消費數量,保證consumer不會出現飽和。劣勢在於當沒有數據,會出現空輪詢,消耗cpu

kafka採用pull,並採用可配置化參數保證當存在數據而且數據量達到必定量的時候,consumer端才進行pull操做,不然一直處於block狀態

kakfa採用整數值consumer position來記錄單個分區的消費狀態,而且單個分區單個消息只能被consumer group內的一個consumer消費,維護簡單開銷小。消費完成,broker收到確認,position指向下次消費的offset。因爲消息不會刪除,在完成消費,position更新以後,consumer依然能夠重置offset從新消費歷史消息

消息發送語義

producer視角

  • 消息最多發送一次:producer異步發送消息,或者同步發消息但重試次數爲0
  • 消息至少發送一次:producer同步發送消息,失敗、超時都會重試
  • 消息發且僅發一次:後續版本支持

consumer視角

  • 消息最多消費一次:consumer先讀取消息,再確認position,最後處理消息
  • 消息至少消費一次:consumer先讀取消息,再處理消息,最後確認position
  • 消息消費且僅消費一次:
    1. 若是消息處理後的輸出端(如db)能保證消息更新冪等性,則屢次消費也能保證exactly once語義
    2. 若是輸出端能支持兩階段提交協議,則能保證確認position和處理輸出消息同時成功或者同時失敗
    3. 在消息處理的輸出端存儲更新後的position,保證了確認position和處理輸出消息的原子性(簡單、通用)

可用性

在kafka中,正常狀況下全部node處於同步中狀態,當某個node處於非同步中狀態,也就意味着整個系統出問題,須要作容錯處理

同步中表明了:

  • 該node與zookeeper能連通
  • 該node若是是follower,那麼consumer position與leader不能差距太大(差額可配置)

某個分區內同步中的node組成一個集合,即該分區的ISR

kafka經過兩個手段容錯:

  • 數據備份:以partition爲單位備份,副本數可設置。當副本數爲N時,表明1個leader,N-1個followers,followers能夠視爲leader的consumer,拉取leader的消息,append到本身的系統中
  • failover:
    1. 當leader處於非同步中時,系統從followers中選舉新leader
    2. 當某個follower狀態變爲非同步中時,leader會將此follower剔除ISR,當此follower恢復並完成數據同步以後再次進入ISR

另外,kafka有個保障:當producer生產消息時,只有當消息被全部ISR確認時,才表示該消息提交成功。只有提交成功的消息,才能被consumer消費

綜上所述:當有N個副本時,N個副本都在ISR中,N-1個副本都出現異常時,系統依然能提供服務

假設N副本全掛了,node恢復後會面臨同步數據的過程,這期間ISR中沒有node,會致使該分區服務不可用。kafka採用一種降級措施來處理:選舉第一個恢復的node做爲leader提供服務,以它的數據爲基準,這個措施被稱爲髒leader選舉

因爲leader是主要提供服務的,kafka broker將多個partition的leader均分在不一樣的server上以均攤風險

每一個parition都有leader,若是在每一個partition內運行選主進程,那麼會致使產生很是多選主進程。kakfa採用一種輕量級的方式:從broker集羣中選出一個做爲controller,這個controller監控掛掉的broker,爲上面的分區批量選主

一致性

上面的方案保證了數據高可用,有時高可用是體如今對一致性的犧牲上。若是但願達到強一致性,能夠採起以下措施:

  • 禁用髒leader選舉,ISR沒有node時,寧肯不提供服務也不要未徹底同步的node
  • 設置最小ISR數量min_isr,保證消息至少要被min_isr個node確認才能提交

持久化

基於如下幾點事實,kafka重度依賴磁盤而非內存來存儲消息

  • 硬盤便宜,內存貴
  • 順序讀+預讀取操做,能提升緩存命中率
  • 操做系統利用富餘的內存做爲pagecache,配合預讀取(read-ahead)+寫回(write-back)技術,從cache讀數據,寫到cache就返回(操做系統後臺flush),提升用戶進程響應速度
  • java對象實際大小比理想大小要大,使得將消息存到內存成本很高
  • 當堆內存佔用不斷增長時,gc抖動較大
  • 基於文件順序讀寫的設計思路,代碼編寫簡單

在持久化數據結構的選擇上,kafka採用了queue而不是Btree

  • kafka只有簡單的根據offset讀和append操做,因此基於queue操做的時間複雜度爲O(1),而基於Btree操做的時間複雜度爲O(logN)
  • 在大量文件讀寫的時候,基於queue的read和append只須要一次磁盤尋址,而Btree則會涉及屢次。磁盤尋址過程極大下降了讀寫性能

性能

kafka在如下四點作了優化:

  • 將大量小io改形成少許大io
  • 利用sendfile減小數據拷貝
  • 支持snappy,gzip,lz4三種算法批量壓縮消息,減小網絡傳輸消耗
  • 採用nio網絡模型,與1 acceptor thread + N processor threads的reactor線程模型

大量讀寫少許消息會致使性能較差,經過將消息聚合,能夠減小讀寫次數(減小隨機IO),增長單次讀寫數據量(增長順序IO)

普通狀況下,數據從磁盤傳輸到網絡須要經歷如下步驟:

  1. 磁盤->內核page cache
  2. 內核page cache->用戶buffer
  3. 用戶buffer->socket buffer
  4. socket buffer->NIC buffer(NIC:網卡接口)

利用sendfile系統調用,能夠簡化至:

  1. 磁盤->內核page cache
  2. 內核page cache->NIC buffer

減小了兩次拷貝步驟。在存在大量數據傳輸的操做時,會顯著提高性能

在大量文件讀寫的時候,基於queue的read和append只須要一次磁盤尋址,而Btree則會涉及屢次。磁盤尋址過程極大下降了讀寫性能

kafka server端採用與Mina同樣的網絡、線程模型。server端基於nio,採用1個acceptor線程接受tcp鏈接,並將鏈接分配給N個proccessor線程,proccessor線程執行具體的IO讀寫、邏輯處理操做。(注:相比較於這種模型,netty的N boss + N worker的模型更加靈活)

外部依賴

zookeeper

broker node在zookeeper中採用惟一id(整數)標識


/brokers/ids/[N] --> host:port 瞬時節點

  • [N] 表明分區數

此znode存儲了broker node的ip端口


/brokers/topics/[topic]/partitions/[N]/state --> leader,isr 瞬時節點

  • [topic] 表明某個topic名稱
  • [N] 表明分區數

此znode存儲了該分區的leader id和isr列表(由id組成)


/consumers/[group_id]/ids/[customer_id] --> {"topic1": #streams, ..., "topicN": #streams} 瞬時節點

  • [group_id] 消費者所屬groupid
  • [customer_id] 消費者id,結構爲 host+uuid
  • topicN 訂閱topic name
  • #streams 消費線程數量

此znode存儲了指定consumer消費topic所使用的線程數


/consumers/[group_id]/offsets/[topic]/[N] --> offset 永久節點

  • [group_id] 消費者所屬groupid
  • [topic] 訂閱topic name
  • [N] 分區數

consumer能夠經過三種方式管理offset:

  • 手動管理。使用低層次consumer api,靈活,較麻煩
  • 交給zookeeper管理。使用高層次consumer api,設置offsets.storage=zookeeper,方便,性能稍差。0.8.2默認配置
  • 交給kafka管理。使用高層次consumer api,設置offsets.storage=kafka,方便,原生態性能優。實現原理是kafka選出一個broker做爲offset manager,建立一個名爲__consumer_offsets的topic,將offset存儲在該topic下,推薦採用

此znode存儲了指定consumer在topic中最新consumer offset


/consumers/[group_id]/owners/[topic]/[N] --> consumer_id 瞬時節點

指定分區在某一時刻只能被全部consumer group中的某一個consumer消費,經過將consumer_id存在指定分區下,就能保證這時該分區只能被這個consumer消費


上面只是列出的最典型的znode,經過研究znode,能夠開發出一個kafka monitor,用來監控kafka數據消費情況,好比KafkaOffsetMonitor

參考資料

相關文章
相關標籤/搜索