深刻剖析Kafka

本文來自OPPO互聯網技術團隊,轉載請註名做者。同時歡迎關注OPPO互聯網技術團隊的公衆號:OPPO_tech,與你分享OPPO前沿互聯網技術及活動。html

Kafka是一個分佈式的基於發佈、訂閱的消息系統,具備着高吞吐、高容錯、高可靠以及高性能等特性,主要用於應用解耦、流量削峯、異步消息等場景。nginx

爲了讓你們更加深刻的瞭解Kafka內部實現原理,文中將會從主題與日誌開始介紹消息的存儲、刪除以及檢索,而後介紹其副本機制的實現原理,最後介紹生產與消費的實現原理以便更合理的應用於實際業務。git

另外,本文較長,建議點贊後再慢慢看 :)算法

1. 引言

Kafka是一個分佈式的基於發佈、訂閱的消息系統,有着強大的消息處理能力,相比與其餘消息系統,具備如下特性:bootstrap

  • 快速數據持久化,實現了O(1)時間複雜度的數據持久化能力。
  • 高吞吐,能在普通的服務器上達到10W每秒的吞吐速率。
  • 高可靠,消息持久化以及副本系統的機制保證了消息的可靠性,消息能夠屢次消費。
  • 高擴展,與其餘分佈式系統同樣,全部組件均支持分佈式、自動實現負載均衡,能夠快速便捷的擴容系統。
  • 離線與實時處理能力並存,提供了在線與離線的消息處理能力。

正是因其具備這些的優秀特性而普遍用於應用解耦、流量削峯、異步消息等場景,好比消息中間件、日誌聚合、流處理等等。segmentfault

本文將從如下幾個方面去介紹kafka:緩存

  1. 第一章簡單介紹下kafka做爲分佈式的消息發佈與訂閱系統所具有的特徵與優點
  2. 第二章節介紹kafka系統的主題與日誌,瞭解消息如何存放、如何檢索以及如何刪除
  3. 第三章節介紹kafka副本機制以瞭解kafka內部如何實現消息的高可靠
  4. 第四章節將會從消息的生產端去介紹消息的分區算法以及冪等特性的具體實現
  5. 第五章節將從消息的消費端去了解消費組、消費位移以及重平衡機制具體實現
  6. 最後章節簡單總結下本文

2. 主題與日誌

2.1 主題

主題是存儲消息的一個邏輯概念,能夠簡單理解爲一類消息的集合,由使用方去建立。Kafka中的主題通常會有多個訂閱者去消費對應主題的消息,也能夠存在多個生產者往主題中寫入消息。服務器

每一個主題又能夠劃分紅多個分區,每一個分區存儲不一樣的消息。當消息添加至分區時,會爲其分配一個位移offset(從0開始遞增),並保證分區上惟一,消息在分區上的順序由offset保證,即同一個分區內的消息是有序的,以下圖所示網絡

同一個主題的不一樣分區會分配在不一樣的節點上(broker),分區時保證Kafka集羣具備水平擴展的基礎。session

以主題nginx_access_log爲例,分區數爲3,如上圖所示。分區在邏輯上對應一個日誌(Log),物理上對應的是一個文件夾。

drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-0/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-1/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-2/

消息寫入分區時,其實是將消息寫入分區所在的文件夾中。日誌又分紅多個分片(Segment),每一個分片由日誌文件與索引文件組成,每一個分片大小是有限的(在kafka集羣的配置文件log.segment.bytes配置,默認爲1073741824byte,即1GB),當分片大小超過限制則會從新建立一個新的分片,外界消息的寫入只會寫入最新的一個分片(順序IO)。

\-rw-r--r--  1 root root    1835920 10月 11 19:18 00000000000000000000.index  
\-rw-r--r--  1 root root 1073741684 10月 11 19:18 00000000000000000000.log  
\-rw-r--r--  1 root root    2737884 10月 11 19:18 00000000000000000000.timeindex  
\-rw-r--r--  1 root root    1828296 10月 11 19:30 00000000000003257573.index  
\-rw-r--r--  1 root root 1073741513 10月 11 19:30 00000000000003257573.log  
\-rw-r--r--  1 root root    2725512 10月 11 19:30 00000000000003257573.timeindex  
\-rw-r--r--  1 root root    1834744 10月 11 19:42 00000000000006506251.index  
\-rw-r--r--  1 root root 1073741771 10月 11 19:42 00000000000006506251.log  
\-rw-r--r--  1 root root    2736072 10月 11 19:42 00000000000006506251.timeindex  
\-rw-r--r--  1 root root    1832152 10月 11 19:54 00000000000009751854.index  
\-rw-r--r--  1 root root 1073740984 10月 11 19:54 00000000000009751854.log  
\-rw-r--r--  1 root root    2731572 10月 11 19:54 00000000000009751854.timeindex  
\-rw-r--r--  1 root root    1808792 10月 11 20:06 00000000000012999310.index  
\-rw-r--r--  1 root root 1073741584 10月 11 20:06 00000000000012999310.log  
\-rw-r--r--  1 root root         10 10月 11 19:54 00000000000012999310.snapshot  
\-rw-r--r--  1 root root    2694564 10月 11 20:06 00000000000012999310.timeindex  
\-rw-r--r--  1 root root   10485760 10月 11 20:09 00000000000016260431.index  
\-rw-r--r--  1 root root  278255892 10月 11 20:09 00000000000016260431.log  
\-rw-r--r--  1 root root         10 10月 11 20:06 00000000000016260431.snapshot  
\-rw-r--r--  1 root root   10485756 10月 11 20:09 00000000000016260431.timeindex  
\-rw-r--r--  1 root root          8 10月 11 19:03 leader-epoch-checkpoint

一個分片包含多個不一樣後綴的日誌文件,分片中的第一個消息的offset將做爲該分片的基準偏移量,偏移量固定長度爲20,不夠前面補齊0,而後將其做爲索引文件以及日誌文件的文件名,如00000000000003257573.index00000000000003257573.log00000000000003257573.timeindex、相同文件名的文件組成一個分片(忽略後綴名),除了.index.timeindex.log後綴的日誌文件外其餘日誌文件,對應含義以下:

文件類型 做用
.index 偏移量索引文件,記錄<相對位移,起始地址>映射關係,其中相對位移表示該分片的第一個消息,從1開始計算,起始地址表示對應相對位移消息在分片.log文件的起始地址
.timeindex 時間戳索引文件,記錄<時間戳,相對位移>映射關係
.log 日誌文件,存儲消息的詳細信息
.snaphot 快照文件
.deleted 分片文件刪除時會先將該分片的全部文件加上.delete後綴,而後有delete-file任務延遲刪除這些文件(file.delete.delay.ms能夠設置延時刪除的的時間)
.cleaned 日誌清理時臨時文件
.swap Log Compaction 以後的臨時文件
.leader-epoch-checkpoint

2.2 日誌索引

首先介紹下.index文件,這裏以文件00000000000003257573.index爲例,首先咱們能夠經過如下命令查看該索引文件的內容,咱們能夠看到輸出結構爲<offset,position>,實際上索引文件中保存的並非offset而是相對位移,好比第一條消息的相對位移則爲0,格式化輸出時加上了基準偏移量,如上圖所示,<114,17413>表示該分片相對位移爲114的消息,其位移爲3257573+114,即3257687,position表示對應offset在.log文件的物理地址,經過.index索引文件則能夠獲取對應offset所在的物理地址。索引採用稀疏索引的方式構建,並不保證分片中的每一個消息都在索引文件有映射關係(.timeindex索引也是相似),主要是爲了節省磁盤空間、內存空間,由於索引文件最終會映射到內存中。

\# 查看該分片索引文件的前10條記錄  
bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |head \-n 10  
Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index  
offset: 3257687 position: 17413  
offset: 3257743 position: 33770  
offset: 3257799 position: 50127  
offset: 3257818 position: 66484  
offset: 3257819 position: 72074  
offset: 3257871 position: 87281  
offset: 3257884 position: 91444  
offset: 3257896 position: 95884  
offset: 3257917 position: 100845  
\# 查看該分片索引文件的後10條記錄  
$ bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |tail \-n 10  
offset: 6506124 position: 1073698512  
offset: 6506137 position: 1073702918  
offset: 6506150 position: 1073707263  
offset: 6506162 position: 1073711499  
offset: 6506176 position: 1073716197  
offset: 6506188 position: 1073720433  
offset: 6506205 position: 1073725654  
offset: 6506217 position: 1073730060  
offset: 6506229 position: 1073734174  
offset: 6506243 position: 1073738288
好比查看offset爲 6506155的消息:首先根據offset找到對應的分片,65061所對應的分片爲 00000000000003257573,而後經過二分法在 00000000000003257573.index文件中找到不大於6506155的最大索引值,獲得<offset: 6506150, position: 1073707263>,而後從 00000000000003257573.log的1073707263位置開始順序掃描找到offset爲650155的消息

Kafka從0.10.0.0版本起,爲分片日誌文件中新增了一個.timeindex的索引文件,能夠根據時間戳定位消息。一樣咱們能夠經過腳本kafka-dump-log.sh查看時間索引的文件內容。

\# 查看該分片時間索引文件的前10條記錄  
bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |head \-n 10  
Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex  
timestamp: 1570792689308 offset: 3257685  
timestamp: 1570792689324 offset: 3257742  
timestamp: 1570792689345 offset: 3257795  
timestamp: 1570792689348 offset: 3257813  
timestamp: 1570792689357 offset: 3257867  
timestamp: 1570792689361 offset: 3257881  
timestamp: 1570792689364 offset: 3257896  
timestamp: 1570792689368 offset: 3257915  
timestamp: 1570792689369 offset: 3257927  
​  
\# 查看該分片時間索引文件的前10條記錄  
bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |tail \-n 10  
Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex  
timestamp: 1570793423474 offset: 6506136  
timestamp: 1570793423477 offset: 6506150  
timestamp: 1570793423481 offset: 6506159  
timestamp: 1570793423485 offset: 6506176  
timestamp: 1570793423489 offset: 6506188  
timestamp: 1570793423493 offset: 6506204  
timestamp: 1570793423496 offset: 6506214  
timestamp: 1570793423500 offset: 6506228  
timestamp: 1570793423503 offset: 6506240  
timestamp: 1570793423505 offset: 6506248
好比我想查看時間戳 1570793423501開始的消息:1.首先定位分片,將 1570793423501與每一個分片的最大時間戳進行對比(最大時間戳取時間索引文件的最後一條記錄時間,若是時間爲0則取該日誌分段的最近修改時間),直到找到大於或等於 1570793423501的日誌分段,所以會定位到時間索引文件 00000000000003257573.timeindex,其最大時間戳爲 1570793423505;2.經過二分法找到大於或等於 1570793423501的最大索引項,即<timestamp: 1570793423503 offset: 6506240>(6506240爲offset,相對位移爲3247667);3.根據相對位移3247667去索引文件中找到不大於該相對位移的最大索引值<3248656,1073734174>;4.從日誌文件 00000000000003257573.log的1073734174位置處開始掃描,查找不小於 1570793423501的數據。

2.3 日誌刪除

與其餘消息中間件不一樣的是,Kafka集羣中的消息不會由於消費與否而刪除,跟日誌同樣消息最終會落盤,並提供對應的策略週期性(經過參數log.retention.check.interval.ms來設置,默認爲5分鐘)執行刪除或者壓縮操做(broker配置文件log.cleanup.policy參數若是爲「delete」則執行刪除操做,若是爲「compact」則執行壓縮操做,默認爲「delete」)。

2.3.1 基於時間的日誌刪除

參數 默認值 說明
log.retention.hours 168 日誌保留時間(小時)
log.retention.minutes 日誌保留時間(分鐘),優先級大於小時
log.retention.ms 日誌保留時間(毫秒),優先級大於分鐘

當消息在集羣保留時間超過設定閾值(log.retention.hours,默認爲168小時,即七天),則須要進行刪除。這裏會根據分片日誌的最大時間戳來判斷該分片的時間是否知足刪除條件,最大時間戳首先會選取時間戳索引文件中的最後一條索引記錄,若是對應的時間戳值大於0則取該值,不然爲最近一次修改時間。

這裏不直接選取最後修改時間的緣由是避免分片日誌的文件被無心篡改而致使其時間不許。

若是剛好該分區下的全部日誌分片均已過時,那麼會先生成一個新的日誌分片做爲新消息的寫入文件,而後再執行刪除參數。

2.3.2 基於空間的日誌刪除

參數 默認值 說明
log.retention.bytes 1073741824(即1G),默認未開啓,即無窮大 日誌文件總大小,並非指單個分片的大小
log.segment.bytes 1073741824(即1G) 單個日誌分片大小

首先會計算待刪除的日誌大小diff(totalSize-log.rentention.bytes),而後從最舊的一個分片開始查看能夠執行刪除操做的文件集合(若是diff-segment.size>=0,則知足刪除條件),最後執行刪除操做。

2.3.3 基於日誌起始偏移量的日誌刪除

通常狀況下,日誌文件的起始偏移量(logStartOffset)會等於第一個日誌分段的baseOffset,可是其值會由於刪除消息請求而增加,logStartOffset的值其實是日誌集合中的最小消息,而小於這個值的消息都會被清理掉。如上圖所示,咱們假設logStartOffset=7421048,日誌刪除流程以下:

  • 從最舊的日誌分片開始遍歷,判斷其下一個分片的baseOffset是否小於或等於logStartOffset值,若是知足,則須要刪除,所以第一個分片會被刪除。
  • 分片二的下一個分片baseOffset=6506251<7421048,因此分片二也須要刪除。
  • 分片三的下一個分片baseOffset=9751854>7421048,因此分片三不會被刪除。

2.4 日誌壓縮

前面提到當broker配置文件log.cleanup.policy參數值設置爲「compact」時,則會執行壓縮操做,這裏的壓縮跟普通意義的壓縮不同,這裏的壓縮是指將相同key的消息只保留最後一個版本的value值,以下圖所示,壓縮以前offset是連續遞增,壓縮以後offset遞增可能不連續,只保留5條消息記錄。

Kafka日誌目錄下cleaner-offset-checkpoint文件,用來記錄每一個主題的每一個分區中已經清理的偏移量,經過這個偏移量能夠將分區中的日誌文件分紅兩個部分:clean表示已經壓縮過;dirty表示還未進行壓縮,以下圖所示(active segment不會參與日誌的壓縮操做,由於會有新的數據寫入該文件)。

\-rw-r--r--  1 root root    4 10月 11 19:02 cleaner-offset-checkpoint  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-0/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-1/  
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx\_access\_log-2/  
\-rw-r--r--  1 root root    0 9月  18 09:50 .lock  
\-rw-r--r--  1 root root    4 10月 16 11:19 log-start-offset-checkpoint  
\-rw-r--r--  1 root root   54 9月  18 09:50 meta.properties  
\-rw-r--r--  1 root root 1518 10月 16 11:19 recovery-point-offset-checkpoint  
\-rw-r--r--  1 root root 1518 10月 16 11:19 replication-offset-checkpoint  
​  
#cat cleaner-offset-checkpoint  
nginx\_access\_log 0 5033168  
nginx\_access\_log 1 5033166  
nginx\_access\_log 2 5033168

日誌壓縮時會根據dirty部分數據佔日誌文件的比例(cleanableRatio)來判斷優先壓縮的日誌,而後爲dirty部分的數據創建key與offset映射關係(保存對應key的最大offset)存入SkimpyoffsetMap中,而後複製segment分段中的數據,只保留SkimpyoffsetMap中記錄的消息,壓縮以後的相關日誌文件大小會減小,爲了不出現太小的日誌文件與索引文件,壓縮時會對全部的segment進行分組(一個組的分片大小不會超過設置的log.segment.bytes值大小),同一個分組的多個分片日誌壓縮以後變成一個分片。

如上圖所示,全部消息都還沒壓縮前 clean checkpoint值爲0,表示該分區的數據還沒進行壓縮,第一次壓縮後,以前每一個分片的日誌文件大小都有所減小,同時會移動 clean checkpoint的位置到這一次壓縮結束的offset值。第二次壓縮時,會將前兩個分片{0.5GB,0.4GB}組成一個分組,{0.7GB,0.2GB}組成一個分組進行壓縮,以此類推。

如上圖所示,日誌壓縮的主要流程以下:

  1. 計算deleteHorizonMs值:當某個消息的value值爲空時,該消息會被保留一段時間,超時以後會在下一次的得日誌壓縮中被刪除,因此這裏會計算deleteHorizonMs,根據該值肯定能夠刪除value值爲空的日誌分片。(deleteHorizonMs = clean部分的最後一個分片的lastModifiedTime - deleteRetionMs,deleteRetionMs經過配置文件log.cleaner.delete.retention.ms配置,默認爲24小時)。
  2. 肯定壓縮dirty部分的offset範圍[firstDirtyOffset,endOffset):其中firstDirtyOffset表示dirty的起始位移,通常會等於clear checkpoint值,firstUncleanableOffset表示不能清理的最小位移,通常會等於活躍分片的baseOffset,而後從firstDirtyOffset位置開始遍歷日誌分片,並填充key與offset的映射關係至SkimpyoffsetMap中,當該map被填充滿或到達上限firstUncleanableOffset時,就能夠肯定日誌壓縮上限endOffset
  3. 將[logStartOffset,endOffset)中的日誌分片進行分組,而後按照分組的方式進行壓縮。

3. 副本

kafka支持消息的冗餘備份,能夠設置對應主題的副本數(--replication-factor參數設置主題的副本數可在建立主題的時候指定,offsets.topic.replication.factor設置消費主題_consumer_offsets副本數,默認爲3),每一個副本包含的消息同樣(但不是徹底一致,可能從副本的數據較主副本稍微有些落後)。每一個分區的副本集合中會有一個副本被選舉爲主副本(leader),其餘爲從副本,全部的讀寫請求由主副本對外提供,從副本負責將主副本的數據同步到本身所屬分區,若是主副本所在分區宕機,則會從新選舉出新的主副本對外提供服務。

3.1 ISR集合

ISR(In-Sync Replica)集合,表示目前能夠用的副本集合,每一個分區中的leader副本會維護此分區的ISR集合。這裏的可用是指從副本的消息量與主副本的消息量相差不大,加入至ISR集合中的副本必須知足如下幾個條件:

  1. 副本所在節點須要與ZooKeeper維持心跳。
  2. 從副本的最後一條消息的offset須要與主副本的最後一條消息offset差值不超過設定閾值(replica.lag.max.messages)或者副本的LEO落後於主副本的LEO時長不大於設定閾值(replica.lag.time.max.ms),官方推薦使用後者判斷,並在新版本kafka0.10.0移除了replica.lag.max.messages參數。

若是從副本不知足以上的任意條件,則會將其提出ISR集合,當其再次知足以上條件以後又會被從新加入集合中。ISR的引入主要是解決同步副本與異步複製兩種方案各自的缺陷(同步副本中若是有個副本宕機或者超時就會拖慢該副本組的總體性能;若是僅僅使用異步副本,當全部的副本消息均遠落後於主副本時,一旦主副本宕機從新選舉,那麼就會存在消息丟失狀況)

3.2 HW&LEO

HW(High Watermark)是一個比較特殊的offset標記,消費端消費時只能拉取到小於HW的消息而HW及以後的消息對於消費者來講是不可見的,該值由主副本管理,當ISR集合中的所有從副本都拉取到HW指定消息以後,主副本會將HW值+1,即指向下一個offset位移,這樣能夠保證HW以前消息的可靠性。

LEO(Log End Offset)表示當前副本最新消息的下一個offset,全部副本都存在這樣一個標記,若是是主副本,當生產端往其追加消息時,會將其值+1。當從副本從主副本成功拉取到消息時,其值也會增長。

3.2.1 從副本更新LEO與HW

從副本的數據是來自主副本,經過向主副本發送fetch請求獲取數據,從副本的LEO值會保存在兩個地方,一個是自身所在的節點),一個是主副本所在節點,自身節點保存LEO主要是爲了更新自身的HW值,主副本保存從副本的LEO也是爲了更新其HW。當從副本每寫入一條新消息就會增長其自身的LEO,主副本收到從副本的fetch請求,會先從自身的日誌中讀取對應數據,在數據返回給從副本以前會先去更新其保存的從副本LEO值。一旦從副本數據寫入完成,就會嘗試更新本身的HW值,比較LEO與fetch響應中主副本的返回HW,取最小值做爲新的HW值。

3.2.2 主副本更新LEO與HW

主副本有日誌寫入時就會更新其自身的LEO值,與從副本相似。而主副本的HW值是分區的HW值,決定分區數據對應消費端的可見性,如下四種狀況,主副本會嘗試更新其HW值:

  • 副本成爲主副本:當某個副本成爲主副本時,kafka會嘗試更新分區的HW值。
  • broker出現奔潰致使副本被踢出ISR集合:若是有broker節點奔潰則會看是否影響對應分區,而後會去檢查分區的HW值是否須要更新。
  • 生成端往主副本寫入消息時:消息寫入會增長其LEO值,此時會查看是否須要修改HW值。
  • 主副本接受到從副本的fetch請求時:主副本在處理從副本的fetch請求時會嘗試更新分區HW值。

前面是去嘗試更新HW,可是不必定會更新,主副本上保存着從副本的LEO值與自身的LEO值,這裏會比較全部知足條件的副本LEO值,並選擇最小的LEO值最爲分區的HW值,其中知足條件的副本是指知足如下兩個條件之一:

  • 副本在ISR集合中
  • 副本的LEO落後於主副本的LEO時長不大於設定閾值(replica.lag.time.max.ms,默認爲10s)

3.3 數據丟失場景

前面提到若是僅僅依賴HW來進行日誌截斷以及水位的判斷會存在問題,如上圖所示,假定存在兩個副本A、副本B,最開始A爲主副本,B爲從副本,且參數min.insync.replicas=1,即ISR只有一個副本時也會返回成功:

  • 初始狀況爲主副本A已經寫入了兩條消息,對應HW=1,LEO=2,LEOB=1,從副本B寫入了一條消息,對應HW=1,LEO=1。
  • 此時從副本B向主副本A發起fetchOffset=1請求,主副本收到請求以後更新LEOB=1,表示副本B已經收到了消息0,而後嘗試更新HW值,min(LEO,LEOB)=1,即不須要更新,而後將消息1以及當前分區HW=1返回給從副本B,從副本B收到響應以後寫入日誌並更新LEO=2,而後更新其HW=1,雖然已經寫入了兩條消息,可是HW值須要在下一輪的請求才會更新爲2。
  • 此時從副本B重啓,重啓以後會根據HW值進行日誌截斷,即消息1會被刪除。
  • 從副本B向主副本A發送fetchOffset=1請求,若是此時主副本A沒有什麼異常,則跟第二步驟同樣沒有什麼問題,假設此時主副本也宕機了,那麼從副本B會變成主副本。
  • 當副本A恢復以後會變成從副本並根據HW值進行日誌截斷,即把消息1丟失,此時消息1就永久丟失了。

3.4 數據不一致場景

如圖所示,假定存在兩個副本A、副本B,最開始A爲主副本,B爲從副本,且參數min.insync.replicas=1,即ISR只有一個副本時也會返回成功:

  • 初始狀態爲主副本A已經寫入了兩條消息對應HW=1,LEO=2,LEOB=1,從副本B也同步了兩條消息,對應HW=1,LEO=2。
  • 此時從副本B向主副本發送fetchOffset=2請求,主副本A在收到請求後更新分區HW=2並將該值返回給從副本B,若是此時從副本B宕機則會致使HW值寫入失敗。
  • 咱們假設此時主副本A也宕機了,從副本B先恢復併成爲主副本,此時會發生日誌截斷,只保留消息0,而後對外提供服務,假設外部寫入了一個消息1(這個消息與以前的消息1不同,用不一樣的顏色標識不一樣消息)。
  • 等副本A起來以後會變成從副本,不會發生日誌截斷,由於HW=2,可是對應位移1的消息實際上是不一致的

3.5 leader epoch機制

HW值被用於衡量副本備份成功與否以及出現失敗狀況時候的日誌截斷依據可能會致使數據丟失與數據不一致狀況,所以在新版的Kafka(0.11.0.0)引入了leader epoch概念,leader epoch表示一個鍵值對<epoch, offset>,其中epoch表示leader主副本的版本號,從0開始編碼,當leader每變動一次就會+1,offset表示該epoch版本的主副本寫入第一條消息的位置,好比<0,0>表示第一個主副本從位移0開始寫入消息,<1,100>表示第二個主副本版本號爲1並從位移100開始寫入消息,主副本會將該信息保存在緩存中並按期寫入到checkpoint文件中,每次發生主副本切換都會去從緩存中查詢該信息,下面簡單介紹下leader epoch的工做原理:

  • 每條消息會都包含一個4字節的leader epoch number值
  • 每一個log目錄都會建立一個leader epoch sequence文件用來存放主副本版本號以及開始位移。
  • 當一個副本成爲主副本以後,會在leader epoch sequence文件末尾添加一條新的記錄,而後每條新的消息就會變成新的leader epoch值。
  • 當某個副本宕機重啓以後,會進行如下操做:

    • 從leader epoch sequence文件中恢復全部的leader epoch。
    • 向分區主副本發送LeaderEpoch請求,請求包含了從副本的leader epoch sequence文件中的最新leader epoch值。
    • 主副本返回從副本對應LeaderEpoch的lastOffset,返回的lastOffset分爲兩種狀況,一種是返回比從副本請求中leader epoch版本大1的開始位移,另一種是與請求中的leader epoch相等則直接返回當前主副本的LEO值。
    • 若是從副本的leader epoch開始位移大於從leader中返回的lastOffset,那麼會將從副本的leader epoch sequence值保持跟主副本一致。
    • 從副本截斷本地消息到主副本返回的LastOffset所在位移處。
    • 從副本開始從主副本開始拉取數據。
    • 在獲取數據時,若是從副本發現消息中的leader epoch值比自身的最新leader epoch值大,則會將該leader epoch 值寫到leader epoch sequence文件,而後繼續同步文件。

下面看下leader epoch機制如何避免前面提到的兩種異常場景

3.5.1 數據丟失場景解決

  • 如圖所示,當從副本B重啓以後向主副本A發送offsetsForLeaderEpochRequest,epoch主從副本相等,則A返回當前的LEO=2,從副本B中沒有任何大於2的位移,所以不須要截斷。
  • 當從副本B向主副本A發送fetchoffset=2請求時,A宕機,因此從副本B成爲主副本,並更新epoch值爲<epoch=1, offset=2>,HW值更新爲2。
  • 當A恢復以後成爲從副本,並向B發送fetcheOffset=2請求,B返回HW=2,則從副本A更新HW=2。
  • 主副本B接受外界的寫請求,從副本A向主副本A不斷髮起數據同步請求。

從上能夠看出引入leader epoch值以後避免了前面提到的數據丟失狀況,可是這裏須要注意的是若是在上面的第一步,從副本B起來以後向主副本A發送offsetsForLeaderEpochRequest請求失敗,即主副本A同時也宕機了,那麼消息1就會丟失,具體可見下面數據不一致場景中有提到。

3.5.2 數據不一致場景解決

  • 從副本B恢復以後向主副本A發送offsetsForLeaderEpochRequest請求,因爲主副本也宕機了,所以副本B將變成主副本並將消息1截斷,此時接受到新消息1的寫入。
  • 副本A恢復以後變成從副本並向主副本A發送offsetsForLeaderEpochRequest請求,請求的epoch值小於主副本B,所以主副本B會返回epoch=1時的開始位移,即lastoffset=1,所以從副本A會截斷消息1。
  • 從副本A從主副本B拉取消息,並更新epoch值<epoch=1, offset=1>。

能夠看出epoch的引入避免的數據不一致,可是兩個副本均宕機,則仍是存在數據丟失的場景,前面的全部討論都是創建在min.insync.replicas=1的前提下,所以須要在數據的可靠性與速度方面作權衡。

4. 生產者

4.1 消息分區選擇

生產者的做用主要是生產消息,將消息存入到Kafka對應主題的分區中,具體某個消息應該存入哪一個分區,有如下三個策略決定(優先級由上到下,依次遞減):

  • 若是消息發送時指定了消息所屬分區,則會直接發往指定分區。
  • 若是沒有指定消息分區,可是設置了消息的key,則會根據key的哈希值選擇分區。
  • 若是前二者均不知足,則會採用輪詢的方式選擇分區。

4.2 ack參數設置及意義

生產端往kafka集羣發送消息時,能夠經過request.required.acks參數來設置數據的可靠性級別

  • 1:默認爲1,表示在ISR中的leader副本成功接收到數據並確認後再發送下一條消息,若是主節點宕機則可能出現數據丟失場景,詳細分析可參考前面提到的副本章節。
  • 0:表示生產端不須要等待節點的確認就能夠繼續發送下一批數據,這種狀況下數據傳輸效率最高,可是數據的可靠性最低。
  • -1:表示生產端須要等待ISR中的全部副本節點都收到數據以後纔算消息寫入成功,可靠性最高,可是性能最低,若是服務端的min.insync.replicas值設置爲1,那麼在這種狀況下容許ISR集合只有一個副本,所以也會存在數據丟失的狀況。

4.3 冪等特性

所謂的冪等性,是指一次或者屢次請求某一個資源對於資源自己應該具備一樣的結果(網絡超時等問題除外),通俗一點的理解就是同一個操做任意執行屢次產生的影響或效果與一次執行影響相同,冪等的關鍵在於服務端可否識別出請求是否重複,而後過濾掉這些重複請求,一般狀況下須要如下信息來實現冪等特性:

  • 惟一標識:判斷某個請求是否重複,須要有一個惟一性標識,而後服務端就能根據這個惟一標識來判斷是否爲重複請求。
  • 記錄已經處理過的請求:服務端須要記錄已經處理過的請求,而後根據惟一標識來判斷是不是重複請求,若是已經處理過,則直接拒絕或者不作任何操做返回成功。

kafka中Producer端的冪等性是指當發送同一條消息時,消息在集羣中只會被持久化一次,其冪等是在如下條件中才成立:

  • 只能保證生產端在單個會話內的冪等,若是生產端由於某些緣由意外掛掉而後重啓,此時是沒辦法保證冪等的,由於這時沒辦法獲取到以前的狀態信息,即沒法作到垮會話級別的冪等。
  • 冪等性不能垮多個主題分區,只能保證單個分區內的冪等,涉及到多個消息分區時,中間的狀態並無同步。

若是要支持垮會話或者垮多個消息分區的狀況,則須要使用kafka的事務性來實現。

爲了實現生成端的冪等語義,引入了Producer ID(PID)與Sequence Number的概念:

  • Producer ID(PID):每一個生產者在初始化時都會分配一個惟一的PID,PID的分配對於用戶來講是透明的。
  • Sequence Number(序列號):對於給定的PID而言,序列號從0開始單調遞增,每一個主題分區均會產生一個獨立序列號,生產者在發送消息時會給每條消息添加一個序列號。broker端緩存了已經提交消息的序列號,只有比緩存分區中最後提交消息的序列號大1的消息纔會被接受,其餘會被拒絕。

4.3.1 生產端消息發送流程

下面簡單介紹下支持冪等的消息發送端工做流程

  1. 生產端經過Kafkaproducer會將數據添加到RecordAccumulator中,數據添加時會判斷是否須要新建一個ProducerBatch。
  2. 生產端後臺啓動發送線程,會判斷當前的PID是否須要重置,重置的緣由是由於某些消息分區的batch重試屢次仍然失敗最後由於超時而被移除,這個時候序列號沒法連續,致使後續消息沒法發送,所以會重置PID,並將相關緩存信息清空,這個時候消息會丟失。
  3. 發送線程判斷是否須要新申請PID,若是須要則會阻塞直到獲取到PID信息。
  4. 發送線程在調用sendProducerData()方法發送數據時,會進行如下判斷:

    • 判斷主題分區是否能夠繼續發送、PID是否有效、若是是重試batch須要判斷以前的batch是否發送完成,若是沒有發送完成則會跳過當前主題分區的消息發送,直到前面的batch發送完成。
    • 若是對應ProducerBatch沒有分配對應的PID與序列號信息,則會在這裏進行設置。

4.3.2 服務端消息接受流程

服務端(broker)在收到生產端發送的數據寫請求以後,會進行一些判斷來決定是否能夠寫入數據,這裏也主要介紹關於冪等相關的操做流程。

  1. 若是請求設置了冪等特性,則會檢查是否對ClusterResource有IdempotentWrite權限,若是沒有,則會返回錯誤CLUSTER_AUTHORIZATION_FAILED
  2. 檢查是否有PID信息。
  3. 根據batch的序列號檢查該batch是否重複,服務端會緩存每一個PID對應主題分區的最近5個batch信息,若是有重複,則直接返回寫入成功,可是不會執行真正的數據寫入操做。
  4. 若是有PID且非重複batch,則進行如下操做:

    • 判斷該PID是否已經存在緩存中。
    • 若是不存在則判斷序列號是不是從0開始,若是是則表示爲新的PID,在緩存中記錄PID的信息(包括PID、epoch以及序列號信息),而後執行數據寫入操做;若是不存在可是序列號不是從0開始,則直接返回錯誤,表示PID在服務端以及過時或者PID寫的數據已通過期。
    • 若是PID存在,則會檢查PID的epoch版本是否與服務端一致,若是不一致且序列號不是從0開始,則返回錯誤。若是epoch不一致可是序列號是從0開始,則能夠正常寫入。
    • 若是epoch版本一致,則會查詢緩存中最近一次序列號是否連續,不連續則會返回錯誤,不然正常寫入。

5. 消費者

消費者主要是從Kafka集羣拉取消息,而後進行相關的消費邏輯,消費者的消費進度由其自身控制,增長消費的靈活性,好比消費端能夠控制重複消費某些消息或者跳過某些消息進行消費。

5.1 消費組

多個消費者能夠組成一個消費組,每一個消費者只屬於一個消費組。消費組訂閱主題的每一個分區只會分配給該消費組中的某個消費者處理,不一樣的消費組之間彼此隔離無依賴。同一個消息只會被消費組中的一個消費者消費,若是想要讓同一個消息被多個消費者消費,那麼每一個消費者須要屬於不一樣的消費組,且對應消費組中只有該一個消費者,消費組的引入能夠實現消費的「獨佔」或「廣播」效果。

  • 消費組下能夠有多個消費者,個數支持動態變化。
  • 消費組訂閱主題下的每一個分區只會分配給消費組中的一個消費者。
  • group.id標識消費組,相同則屬於同一消費組。
  • 不一樣消費組之間相互隔離互不影響。

如圖所示,消費組1中包含兩個消費者,其中消費者1分配消費分區0,消費者2分配消費分區1與分區2。此外消費組的引入還支持消費者的水平擴展及故障轉移,好比從上圖咱們能夠看出消費者2的消費能力不足,相對消費者1來講消費進度比較落後,咱們能夠往消費組裏面增長一個消費者以提升其總體的消費能力,以下圖所示。

假設消費者1所在機器出現宕機,消費組會發送重平衡,假設將分區0分配給消費者2進行消費,以下圖所示。同個消費組中消費者的個數不是越多越好,最大不能超過主題對應的分區數,若是超過則會出現超過的消費者分配不到分區的狀況,由於分區一旦分配給消費者就不會再變更,除非組內消費者個數出現變更而發生重平衡。

5.2 消費位移

5.2.1 消費位移主題

Kafka 0.9開始將消費端的位移信息保存在集羣的內部主題(__consumer_offsets)中,該主題默認爲50個分區,每條日誌項的格式都是:<TopicPartition, OffsetAndMetadata>,其key爲主題分區主要存放主題、分區以及消費組信息,value爲OffsetAndMetadata對象主要包括位移、位移提交時間、自定義元數據等信息。只有消費組往kafka中提交位移纔會往這個主題中寫入數據,若是消費端將消費位移信息保存在外部存儲,則不會有消費位移信息,下面能夠經過kafka-console-consumer.sh腳本查看主題消費位移信息。

\# bin/kafka-console-consumer.sh --topic \_\_consumer\_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning  
​  
\[consumer-group01,nginx\_access\_log,2\]::OffsetAndMetadata(offset\=17104625, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)  
\[consumer-group01,nginx\_access\_log,1\]::OffsetAndMetadata(offset\=17103024, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)  
\[consumer-group01,nginx\_access\_log,0\]::OffsetAndMetadata(offset\=17107771, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)

5.2.2 消費位移自動提交

消費端能夠經過設置參數enable.auto.commit來控制是自動提交仍是手動,若是值爲true則表示自動提交,在消費端的後臺會定時的提交消費位移信息,時間間隔由auto.commit.interval.ms(默認爲5秒)。

可是若是設置爲自動提交會存在如下幾個問題:

  1. 可能存在重複的位移數據提交到消費位移主題中,由於每隔5秒會往主題中寫入一條消息,無論是否有新的消費記錄,這樣就會產生大量的同key消息,其實只須要一條,所以須要依賴前面提到日誌壓縮策略來清理數據。
  2. 重複消費,假設位移提交的時間間隔爲5秒,那麼在5秒內若是發生了rebalance,則全部的消費者會從上一次提交的位移處開始消費,那麼期間消費的數據則會再次被消費。

5.2.3 消費位移手動提交

手動提交須要將enable.auto.commit值設置爲false,而後由業務消費端來控制消費進度,手動提交又分爲如下三種類型:

  • 同步手動提交位移:若是調用的是同步提交方法commitSync(),則會將poll拉取的最新位移提交到kafka集羣,提交成功前會一直等待提交成功。
  • 異步手動提交位移:調用異步提交方法commitAsync(),在調用該方法以後會馬上返回,不會阻塞,而後能夠經過回調函數執行相關的異常處理邏輯。
  • 指定提交位移:指定位移提交也分爲異步跟同步,傳參爲Map<TopicPartition, OffsetAndMetadata>,其中key爲消息分區,value爲位移對象。

5.3 分組協調者

分組協調者(Group Coordinator)是一個服務,kafka集羣中的每一個節點在啓動時都會啓動這樣一個服務,該服務主要是用來存儲消費分組相關的元數據信息,每一個消費組均會選擇一個協調者來負責組內各個分區的消費位移信息存儲,選擇的主要步驟以下:

  • 首選肯定消費組的位移信息存入哪一個分區:前面提到默認的__consumer_offsets主題分區數爲50,經過如下算法能夠計算出對應消費組的位移信息應該存入哪一個分區 partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 其中groupId爲消費組的id,這個由消費端指定,groupMetadataTopicPartitionCount爲主題分區數。
  • 根據partition尋找該分區的leader所對應的節點broker,該broker的Coordinator即爲該消費組的Coordinator。

5.4 重平衡機制

5.4.1 重平衡發生場景

如下幾種場景均會觸發重平衡操做:

  1. 新的消費者加入到消費組中。
  2. 消費者被動下線。好比消費者長時間的GC、網絡延遲致使消費者長時間未向Group Coordinator發送心跳請求,均會認爲該消費者已經下線並踢出。
  3. 消費者主動退出消費組。
  4. 消費組訂閱的任意一個主題分區數出現變化。
  5. 消費者取消某個主題的訂閱。

5.4.2 重平衡操做流程

重平衡的實現能夠分爲如下幾個階段:

  1. 查找Group Coordinator:消費者會從kafka集羣中選擇一個負載最小的節點發送GroupCoorinatorRequest請求,並處理返回響應GroupCoordinatorResponse。其中請求參數中包含消費組的id,響應中包含Coordinator所在節點id、host以及端口號信息。
  2. Join group:當消費者拿到協調者的信息以後會往協調者發送加入消費組的請求JoinGroupRequest,當全部的消費者都發送該請求以後,協調者會從中選擇一個消費者做爲leader角色,而後將組內成員信息、訂閱等信息發給消費者(響應格式JoinGroupResponse見下表),leader負責消費方案的分配。

JoinGroupRequest請求數據格式

名稱 類型 說明
group_id String 消費者id
seesion_timeout int 協調者超過session_timeout指定的時間沒有收到心跳消息,則認爲該消費者下線
member_id String 協調者分配給消費者的id
protocol_type String 消費組實現的協議,默認爲sonsumer
group_protocols List 包含此消費者支持的所有PartitionAssignor類型
protocol_name String PartitionAssignor類型
protocol_metadata byte[] 針對不一樣PartitionAssignor類型序列化後的消費者訂閱信息,包含用戶自定義數據userData

JoinGroupResponse響應數據格式

名稱 類型 說明
error_code short 錯誤碼
generation_id int 協調者分配的年代信息
group_protocol String 協調者選擇的PartitionAssignor類型
leader_id String Leader的member_id
member_id String 協調者分配給消費者的id
members Map集合 消費組中所有的消費者訂閱信息
member_metadata byte[] 對應消費者的訂閱信息
  1. Synchronizing Group State階段:當leader消費者完成消費方案的分配後會發送SyncGroupRequest請求給協調者,其餘非leader節點也會發送該請求,只是請求參數爲空,而後協調者將分配結果做爲響應SyncGroupResponse發給各個消費者,請求及相應的數據格式以下表所示:

SyncGroupRequest請求數據格式

名稱 類型 說明
group_id String 消費組的id
generation_id int 消費組保存的年代信息
member_id String 協調者分配的消費者id
member_assignment byte[] 分區分配結果

SyncGroupResponse響應數據格式

名稱 類型 說明
error_code short 錯誤碼
member_assignment byte[] 分配給當前消費者的分區

5.4.3 分區分配策略

Kafka提供了三個分區分配策略:RangeAssignor、RoundRobinAssignor以及StickyAssignor,下面簡單介紹下各個算法的實現。

  1. RangeAssignor:kafka默認會採用此策略進行分區分配,主要流程以下

    • 將全部訂閱主題下的分區進行排序獲得集合TP={TP0,Tp1,...,TPN+1}
    • 對消費組中的全部消費者根據名字進行字典排序獲得集合CG={C0,C1,...,CM+1}
    • 計算D=N/MR=N%M
    • 消費者Ci獲取消費分區起始位置=D*i+min(i,R),Ci獲取的分區總數=D+(if (i+1>R)0 else 1)。
假設一個消費組中存在兩個消費者{C0,C1},該消費組訂閱了三個主題{T1,T2,T3},每一個主題分別存在三個分區,一共就有9個分區{TP1,TP2,...,TP9}。經過以上算法咱們能夠獲得D=4,R=1,那麼消費組C0將消費的分區爲{TP1,TP2,TP3,TP4,TP5},C1將消費分區{TP6,TP7,TP8,TP9}。這裏存在一個問題,若是不能均分,那麼前面的幾個消費者將會多消費一個分區。
  1. RoundRobinAssignor:使用該策略須要知足如下兩個條件:1) 消費組中的全部消費者應該訂閱主題相同;2) 同一個消費組的全部消費者在實例化時給每一個主題指定相同的流數。

    • 對全部主題的全部分區根據主題+分區獲得的哈希值進行排序。
    • 對全部消費者按字典排序。
    • 經過輪詢的方式將分區分配給消費者。
  2. StickyAssignor:該分配方式在0.11版本開始引入,主要是保證如下特性:1) 儘量的保證分配均衡;2) 當從新分配時,保留儘量多的現有分配。其中第一條的優先級要大於第二條。

6. 總結

在本文中,咱們圍繞Kafka的特性,詳細介紹了其原理實現,經過主題與日誌的深刻剖析,瞭解了Kafka內部消息的存放、檢索以及刪除機制。副本系統中的ISR概念的引入解決同步副本與異步複製兩種方案各自的缺陷,lead epoch機制的出現解決了數據丟失以及數據不一致問題。生產端的分區選擇算法實現了數據均衡,冪等特性的支持則解決了以前存在的重複消息問題。

最後介紹了消費端的相關原理,消費組機制實現了消費端的消息隔離,既有廣播也有獨佔的場景支持,而重平衡機制則保證的消費端的健壯性與擴展性。

參考文獻

[1] 徐郡明.Apach Kafka 源碼剖析[M].北京.電子工業出版社,2017.
[2] Kafka深度解析.
[3] 深刻淺出理解基於 Kafka 和 ZooKeeper 的分佈式消息隊列.
[4] Kafka 事務性之冪等性實現.
[5] Kafka水位(high watermark)與leader epoch的討論.
[6] kafka消費者如何分配分區.

相關文章
相關標籤/搜索