本文來自OPPO互聯網技術團隊,轉載請註名做者。同時歡迎關注OPPO互聯網技術團隊的公衆號:OPPO_tech,與你分享OPPO前沿互聯網技術及活動。html
Kafka是一個分佈式的基於發佈、訂閱的消息系統,具備着高吞吐、高容錯、高可靠以及高性能等特性,主要用於應用解耦、流量削峯、異步消息等場景。nginx
爲了讓你們更加深刻的瞭解Kafka內部實現原理,文中將會從主題與日誌開始介紹消息的存儲、刪除以及檢索,而後介紹其副本機制的實現原理,最後介紹生產與消費的實現原理以便更合理的應用於實際業務。git
另外,本文較長,建議點贊後再慢慢看 :)算法
Kafka是一個分佈式的基於發佈、訂閱的消息系統,有着強大的消息處理能力,相比與其餘消息系統,具備如下特性:bootstrap
正是因其具備這些的優秀特性而普遍用於應用解耦、流量削峯、異步消息等場景,好比消息中間件、日誌聚合、流處理等等。segmentfault
本文將從如下幾個方面去介紹kafka:緩存
主題是存儲消息的一個邏輯概念,能夠簡單理解爲一類消息的集合,由使用方去建立。Kafka中的主題通常會有多個訂閱者去消費對應主題的消息,也能夠存在多個生產者往主題中寫入消息。服務器
每一個主題又能夠劃分紅多個分區,每一個分區存儲不一樣的消息。當消息添加至分區時,會爲其分配一個位移offset(從0開始遞增),並保證分區上惟一,消息在分區上的順序由offset保證,即同一個分區內的消息是有序的,以下圖所示網絡
同一個主題的不一樣分區會分配在不一樣的節點上(broker),分區時保證Kafka集羣具備水平擴展的基礎。session
以主題nginx_access_log
爲例,分區數爲3,如上圖所示。分區在邏輯上對應一個日誌(Log),物理上對應的是一個文件夾。
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-0/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-1/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-2/
消息寫入分區時,其實是將消息寫入分區所在的文件夾中。日誌又分紅多個分片(Segment),每一個分片由日誌文件與索引文件組成,每一個分片大小是有限的(在kafka集羣的配置文件log.segment.bytes
配置,默認爲1073741824byte,即1GB),當分片大小超過限制則會從新建立一個新的分片,外界消息的寫入只會寫入最新的一個分片(順序IO)。
\-rw-r--r-- 1 root root 1835920 10月 11 19:18 00000000000000000000.index \-rw-r--r-- 1 root root 1073741684 10月 11 19:18 00000000000000000000.log \-rw-r--r-- 1 root root 2737884 10月 11 19:18 00000000000000000000.timeindex \-rw-r--r-- 1 root root 1828296 10月 11 19:30 00000000000003257573.index \-rw-r--r-- 1 root root 1073741513 10月 11 19:30 00000000000003257573.log \-rw-r--r-- 1 root root 2725512 10月 11 19:30 00000000000003257573.timeindex \-rw-r--r-- 1 root root 1834744 10月 11 19:42 00000000000006506251.index \-rw-r--r-- 1 root root 1073741771 10月 11 19:42 00000000000006506251.log \-rw-r--r-- 1 root root 2736072 10月 11 19:42 00000000000006506251.timeindex \-rw-r--r-- 1 root root 1832152 10月 11 19:54 00000000000009751854.index \-rw-r--r-- 1 root root 1073740984 10月 11 19:54 00000000000009751854.log \-rw-r--r-- 1 root root 2731572 10月 11 19:54 00000000000009751854.timeindex \-rw-r--r-- 1 root root 1808792 10月 11 20:06 00000000000012999310.index \-rw-r--r-- 1 root root 1073741584 10月 11 20:06 00000000000012999310.log \-rw-r--r-- 1 root root 10 10月 11 19:54 00000000000012999310.snapshot \-rw-r--r-- 1 root root 2694564 10月 11 20:06 00000000000012999310.timeindex \-rw-r--r-- 1 root root 10485760 10月 11 20:09 00000000000016260431.index \-rw-r--r-- 1 root root 278255892 10月 11 20:09 00000000000016260431.log \-rw-r--r-- 1 root root 10 10月 11 20:06 00000000000016260431.snapshot \-rw-r--r-- 1 root root 10485756 10月 11 20:09 00000000000016260431.timeindex \-rw-r--r-- 1 root root 8 10月 11 19:03 leader-epoch-checkpoint
一個分片包含多個不一樣後綴的日誌文件,分片中的第一個消息的offset將做爲該分片的基準偏移量,偏移量固定長度爲20,不夠前面補齊0,而後將其做爲索引文件以及日誌文件的文件名,如00000000000003257573.index
、00000000000003257573.log
、00000000000003257573.timeindex
、相同文件名的文件組成一個分片(忽略後綴名),除了.index
、.timeindex
、 .log
後綴的日誌文件外其餘日誌文件,對應含義以下:
文件類型 | 做用 |
---|---|
.index | 偏移量索引文件,記錄<相對位移,起始地址>映射關係,其中相對位移表示該分片的第一個消息,從1開始計算,起始地址表示對應相對位移消息在分片.log文件的起始地址 |
.timeindex | 時間戳索引文件,記錄<時間戳,相對位移>映射關係 |
.log | 日誌文件,存儲消息的詳細信息 |
.snaphot | 快照文件 |
.deleted | 分片文件刪除時會先將該分片的全部文件加上.delete後綴,而後有delete-file 任務延遲刪除這些文件(file.delete.delay.ms能夠設置延時刪除的的時間) |
.cleaned | 日誌清理時臨時文件 |
.swap | Log Compaction 以後的臨時文件 |
.leader-epoch-checkpoint |
首先介紹下.index
文件,這裏以文件00000000000003257573.index
爲例,首先咱們能夠經過如下命令查看該索引文件的內容,咱們能夠看到輸出結構爲<offset,position>,實際上索引文件中保存的並非offset而是相對位移,好比第一條消息的相對位移則爲0,格式化輸出時加上了基準偏移量,如上圖所示,<114,17413>表示該分片相對位移爲114的消息,其位移爲3257573+114,即3257687,position表示對應offset在.log
文件的物理地址,經過.index
索引文件則能夠獲取對應offset所在的物理地址。索引採用稀疏索引的方式構建,並不保證分片中的每一個消息都在索引文件有映射關係(.timeindex
索引也是相似),主要是爲了節省磁盤空間、內存空間,由於索引文件最終會映射到內存中。
\# 查看該分片索引文件的前10條記錄 bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |head \-n 10 Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index offset: 3257687 position: 17413 offset: 3257743 position: 33770 offset: 3257799 position: 50127 offset: 3257818 position: 66484 offset: 3257819 position: 72074 offset: 3257871 position: 87281 offset: 3257884 position: 91444 offset: 3257896 position: 95884 offset: 3257917 position: 100845 \# 查看該分片索引文件的後10條記錄 $ bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.index |tail \-n 10 offset: 6506124 position: 1073698512 offset: 6506137 position: 1073702918 offset: 6506150 position: 1073707263 offset: 6506162 position: 1073711499 offset: 6506176 position: 1073716197 offset: 6506188 position: 1073720433 offset: 6506205 position: 1073725654 offset: 6506217 position: 1073730060 offset: 6506229 position: 1073734174 offset: 6506243 position: 1073738288
好比查看offset爲6506155
的消息:首先根據offset找到對應的分片,65061所對應的分片爲00000000000003257573
,而後經過二分法在00000000000003257573.index
文件中找到不大於6506155的最大索引值,獲得<offset: 6506150, position: 1073707263>,而後從00000000000003257573.log
的1073707263位置開始順序掃描找到offset爲650155的消息
Kafka從0.10.0.0版本起,爲分片日誌文件中新增了一個.timeindex
的索引文件,能夠根據時間戳定位消息。一樣咱們能夠經過腳本kafka-dump-log.sh
查看時間索引的文件內容。
\# 查看該分片時間索引文件的前10條記錄 bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |head \-n 10 Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex timestamp: 1570792689308 offset: 3257685 timestamp: 1570792689324 offset: 3257742 timestamp: 1570792689345 offset: 3257795 timestamp: 1570792689348 offset: 3257813 timestamp: 1570792689357 offset: 3257867 timestamp: 1570792689361 offset: 3257881 timestamp: 1570792689364 offset: 3257896 timestamp: 1570792689368 offset: 3257915 timestamp: 1570792689369 offset: 3257927 \# 查看該分片時間索引文件的前10條記錄 bin/kafka-dump-log.sh \--files /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex |tail \-n 10 Dumping /tmp/kafka-logs/nginx\_access\_log-1/00000000000003257573.timeindex timestamp: 1570793423474 offset: 6506136 timestamp: 1570793423477 offset: 6506150 timestamp: 1570793423481 offset: 6506159 timestamp: 1570793423485 offset: 6506176 timestamp: 1570793423489 offset: 6506188 timestamp: 1570793423493 offset: 6506204 timestamp: 1570793423496 offset: 6506214 timestamp: 1570793423500 offset: 6506228 timestamp: 1570793423503 offset: 6506240 timestamp: 1570793423505 offset: 6506248
好比我想查看時間戳1570793423501
開始的消息:1.首先定位分片,將1570793423501
與每一個分片的最大時間戳進行對比(最大時間戳取時間索引文件的最後一條記錄時間,若是時間爲0則取該日誌分段的最近修改時間),直到找到大於或等於1570793423501
的日誌分段,所以會定位到時間索引文件00000000000003257573.timeindex
,其最大時間戳爲1570793423505
;2.經過二分法找到大於或等於1570793423501
的最大索引項,即<timestamp: 1570793423503 offset: 6506240>(6506240爲offset,相對位移爲3247667);3.根據相對位移3247667去索引文件中找到不大於該相對位移的最大索引值<3248656,1073734174>;4.從日誌文件00000000000003257573.log
的1073734174位置處開始掃描,查找不小於1570793423501
的數據。
與其餘消息中間件不一樣的是,Kafka集羣中的消息不會由於消費與否而刪除,跟日誌同樣消息最終會落盤,並提供對應的策略週期性(經過參數log.retention.check.interval.ms來設置,默認爲5分鐘)執行刪除或者壓縮操做(broker配置文件log.cleanup.policy
參數若是爲「delete」則執行刪除操做,若是爲「compact」則執行壓縮操做,默認爲「delete」)。
參數 | 默認值 | 說明 |
---|---|---|
log.retention.hours | 168 | 日誌保留時間(小時) |
log.retention.minutes | 無 | 日誌保留時間(分鐘),優先級大於小時 |
log.retention.ms | 無 | 日誌保留時間(毫秒),優先級大於分鐘 |
當消息在集羣保留時間超過設定閾值(log.retention.hours,默認爲168小時,即七天),則須要進行刪除。這裏會根據分片日誌的最大時間戳來判斷該分片的時間是否知足刪除條件,最大時間戳首先會選取時間戳索引文件中的最後一條索引記錄,若是對應的時間戳值大於0則取該值,不然爲最近一次修改時間。
這裏不直接選取最後修改時間的緣由是避免分片日誌的文件被無心篡改而致使其時間不許。
若是剛好該分區下的全部日誌分片均已過時,那麼會先生成一個新的日誌分片做爲新消息的寫入文件,而後再執行刪除參數。
參數 | 默認值 | 說明 |
---|---|---|
log.retention.bytes | 1073741824(即1G),默認未開啓,即無窮大 | 日誌文件總大小,並非指單個分片的大小 |
log.segment.bytes | 1073741824(即1G) | 單個日誌分片大小 |
首先會計算待刪除的日誌大小diff
(totalSize-log.rentention.bytes),而後從最舊的一個分片開始查看能夠執行刪除操做的文件集合(若是diff-segment.size>=0
,則知足刪除條件),最後執行刪除操做。
通常狀況下,日誌文件的起始偏移量(logStartOffset)會等於第一個日誌分段的baseOffset,可是其值會由於刪除消息請求而增加,logStartOffset的值其實是日誌集合中的最小消息,而小於這個值的消息都會被清理掉。如上圖所示,咱們假設logStartOffset=7421048,日誌刪除流程以下:
前面提到當broker配置文件log.cleanup.policy
參數值設置爲「compact」時,則會執行壓縮操做,這裏的壓縮跟普通意義的壓縮不同,這裏的壓縮是指將相同key的消息只保留最後一個版本的value值,以下圖所示,壓縮以前offset是連續遞增,壓縮以後offset遞增可能不連續,只保留5條消息記錄。
Kafka日誌目錄下cleaner-offset-checkpoint
文件,用來記錄每一個主題的每一個分區中已經清理的偏移量,經過這個偏移量能夠將分區中的日誌文件分紅兩個部分:clean
表示已經壓縮過;dirty
表示還未進行壓縮,以下圖所示(active segment不會參與日誌的壓縮操做,由於會有新的數據寫入該文件)。
\-rw-r--r-- 1 root root 4 10月 11 19:02 cleaner-offset-checkpoint drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-0/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-1/ drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx\_access\_log-2/ \-rw-r--r-- 1 root root 0 9月 18 09:50 .lock \-rw-r--r-- 1 root root 4 10月 16 11:19 log-start-offset-checkpoint \-rw-r--r-- 1 root root 54 9月 18 09:50 meta.properties \-rw-r--r-- 1 root root 1518 10月 16 11:19 recovery-point-offset-checkpoint \-rw-r--r-- 1 root root 1518 10月 16 11:19 replication-offset-checkpoint #cat cleaner-offset-checkpoint nginx\_access\_log 0 5033168 nginx\_access\_log 1 5033166 nginx\_access\_log 2 5033168
日誌壓縮時會根據dirty部分數據佔日誌文件的比例(cleanableRatio)來判斷優先壓縮的日誌,而後爲dirty部分的數據創建key與offset映射關係(保存對應key的最大offset)存入SkimpyoffsetMap中,而後複製segment分段中的數據,只保留SkimpyoffsetMap中記錄的消息,壓縮以後的相關日誌文件大小會減小,爲了不出現太小的日誌文件與索引文件,壓縮時會對全部的segment進行分組(一個組的分片大小不會超過設置的log.segment.bytes
值大小),同一個分組的多個分片日誌壓縮以後變成一個分片。
如上圖所示,全部消息都還沒壓縮前clean checkpoint
值爲0,表示該分區的數據還沒進行壓縮,第一次壓縮後,以前每一個分片的日誌文件大小都有所減小,同時會移動clean checkpoint
的位置到這一次壓縮結束的offset值。第二次壓縮時,會將前兩個分片{0.5GB,0.4GB}組成一個分組,{0.7GB,0.2GB}組成一個分組進行壓縮,以此類推。
如上圖所示,日誌壓縮的主要流程以下:
deleteHorizonMs
值:當某個消息的value值爲空時,該消息會被保留一段時間,超時以後會在下一次的得日誌壓縮中被刪除,因此這裏會計算deleteHorizonMs
,根據該值肯定能夠刪除value值爲空的日誌分片。(deleteHorizonMs = clean部分的最後一個分片的lastModifiedTime - deleteRetionMs
,deleteRetionMs經過配置文件log.cleaner.delete.retention.ms配置,默認爲24小時)。firstDirtyOffset
表示dirty的起始位移,通常會等於clear checkpoint
值,firstUncleanableOffset
表示不能清理的最小位移,通常會等於活躍分片的baseOffset,而後從firstDirtyOffset位置開始遍歷日誌分片,並填充key與offset的映射關係至SkimpyoffsetMap中,當該map被填充滿或到達上限firstUncleanableOffset
時,就能夠肯定日誌壓縮上限endOffset
。kafka支持消息的冗餘備份,能夠設置對應主題的副本數(--replication-factor
參數設置主題的副本數可在建立主題的時候指定,offsets.topic.replication.factor
設置消費主題_consumer_offsets
副本數,默認爲3),每一個副本包含的消息同樣(但不是徹底一致,可能從副本的數據較主副本稍微有些落後)。每一個分區的副本集合中會有一個副本被選舉爲主副本(leader),其餘爲從副本,全部的讀寫請求由主副本對外提供,從副本負責將主副本的數據同步到本身所屬分區,若是主副本所在分區宕機,則會從新選舉出新的主副本對外提供服務。
ISR(In-Sync Replica)集合,表示目前能夠用的副本集合,每一個分區中的leader副本會維護此分區的ISR集合。這裏的可用是指從副本的消息量與主副本的消息量相差不大,加入至ISR集合中的副本必須知足如下幾個條件:
replica.lag.max.messages
)或者副本的LEO落後於主副本的LEO時長不大於設定閾值(replica.lag.time.max.ms
),官方推薦使用後者判斷,並在新版本kafka0.10.0移除了replica.lag.max.messages
參數。若是從副本不知足以上的任意條件,則會將其提出ISR集合,當其再次知足以上條件以後又會被從新加入集合中。ISR的引入主要是解決同步副本與異步複製兩種方案各自的缺陷(同步副本中若是有個副本宕機或者超時就會拖慢該副本組的總體性能;若是僅僅使用異步副本,當全部的副本消息均遠落後於主副本時,一旦主副本宕機從新選舉,那麼就會存在消息丟失狀況)
HW(High Watermark)是一個比較特殊的offset標記,消費端消費時只能拉取到小於HW的消息而HW及以後的消息對於消費者來講是不可見的,該值由主副本管理,當ISR集合中的所有從副本都拉取到HW指定消息以後,主副本會將HW值+1,即指向下一個offset位移,這樣能夠保證HW以前消息的可靠性。
LEO(Log End Offset)表示當前副本最新消息的下一個offset,全部副本都存在這樣一個標記,若是是主副本,當生產端往其追加消息時,會將其值+1。當從副本從主副本成功拉取到消息時,其值也會增長。
從副本的數據是來自主副本,經過向主副本發送fetch請求獲取數據,從副本的LEO值會保存在兩個地方,一個是自身所在的節點),一個是主副本所在節點,自身節點保存LEO主要是爲了更新自身的HW值,主副本保存從副本的LEO也是爲了更新其HW。當從副本每寫入一條新消息就會增長其自身的LEO,主副本收到從副本的fetch請求,會先從自身的日誌中讀取對應數據,在數據返回給從副本以前會先去更新其保存的從副本LEO值。一旦從副本數據寫入完成,就會嘗試更新本身的HW值,比較LEO與fetch響應中主副本的返回HW,取最小值做爲新的HW值。
主副本有日誌寫入時就會更新其自身的LEO值,與從副本相似。而主副本的HW值是分區的HW值,決定分區數據對應消費端的可見性,如下四種狀況,主副本會嘗試更新其HW值:
前面是去嘗試更新HW,可是不必定會更新,主副本上保存着從副本的LEO值與自身的LEO值,這裏會比較全部知足條件的副本LEO值,並選擇最小的LEO值最爲分區的HW值,其中知足條件的副本是指知足如下兩個條件之一:
前面提到若是僅僅依賴HW來進行日誌截斷以及水位的判斷會存在問題,如上圖所示,假定存在兩個副本A、副本B,最開始A爲主副本,B爲從副本,且參數min.insync.replicas=1
,即ISR只有一個副本時也會返回成功:
min(LEO,LEOB)=1
,即不須要更新,而後將消息1以及當前分區HW=1返回給從副本B,從副本B收到響應以後寫入日誌並更新LEO=2,而後更新其HW=1,雖然已經寫入了兩條消息,可是HW值須要在下一輪的請求才會更新爲2。
如圖所示,假定存在兩個副本A、副本B,最開始A爲主副本,B爲從副本,且參數min.insync.replicas=1
,即ISR只有一個副本時也會返回成功:
HW值被用於衡量副本備份成功與否以及出現失敗狀況時候的日誌截斷依據可能會致使數據丟失與數據不一致狀況,所以在新版的Kafka(0.11.0.0)引入了leader epoch概念,leader epoch表示一個鍵值對<epoch, offset>,其中epoch表示leader主副本的版本號,從0開始編碼,當leader每變動一次就會+1,offset表示該epoch版本的主副本寫入第一條消息的位置,好比<0,0>表示第一個主副本從位移0開始寫入消息,<1,100>表示第二個主副本版本號爲1並從位移100開始寫入消息,主副本會將該信息保存在緩存中並按期寫入到checkpoint文件中,每次發生主副本切換都會去從緩存中查詢該信息,下面簡單介紹下leader epoch的工做原理:
當某個副本宕機重啓以後,會進行如下操做:
下面看下leader epoch機制如何避免前面提到的兩種異常場景
offsetsForLeaderEpochRequest
,epoch主從副本相等,則A返回當前的LEO=2,從副本B中沒有任何大於2的位移,所以不須要截斷。從上能夠看出引入leader epoch值以後避免了前面提到的數據丟失狀況,可是這裏須要注意的是若是在上面的第一步,從副本B起來以後向主副本A發送offsetsForLeaderEpochRequest
請求失敗,即主副本A同時也宕機了,那麼消息1就會丟失,具體可見下面數據不一致場景中有提到。
offsetsForLeaderEpochRequest
請求,因爲主副本也宕機了,所以副本B將變成主副本並將消息1截斷,此時接受到新消息1的寫入。offsetsForLeaderEpochRequest
請求,請求的epoch值小於主副本B,所以主副本B會返回epoch=1時的開始位移,即lastoffset=1,所以從副本A會截斷消息1。能夠看出epoch的引入避免的數據不一致,可是兩個副本均宕機,則仍是存在數據丟失的場景,前面的全部討論都是創建在min.insync.replicas=1
的前提下,所以須要在數據的可靠性與速度方面作權衡。
生產者的做用主要是生產消息,將消息存入到Kafka對應主題的分區中,具體某個消息應該存入哪一個分區,有如下三個策略決定(優先級由上到下,依次遞減):
key
,則會根據key
的哈希值選擇分區。生產端往kafka集羣發送消息時,能夠經過request.required.acks
參數來設置數據的可靠性級別
min.insync.replicas
值設置爲1,那麼在這種狀況下容許ISR集合只有一個副本,所以也會存在數據丟失的狀況。所謂的冪等性,是指一次或者屢次請求某一個資源對於資源自己應該具備一樣的結果(網絡超時等問題除外),通俗一點的理解就是同一個操做任意執行屢次產生的影響或效果與一次執行影響相同,冪等的關鍵在於服務端可否識別出請求是否重複,而後過濾掉這些重複請求,一般狀況下須要如下信息來實現冪等特性:
kafka中Producer端的冪等性是指當發送同一條消息時,消息在集羣中只會被持久化一次,其冪等是在如下條件中才成立:
若是要支持垮會話或者垮多個消息分區的狀況,則須要使用kafka的事務性來實現。
爲了實現生成端的冪等語義,引入了Producer ID(PID)與Sequence Number的概念:
下面簡單介紹下支持冪等的消息發送端工做流程
發送線程在調用sendProducerData()
方法發送數據時,會進行如下判斷:
服務端(broker)在收到生產端發送的數據寫請求以後,會進行一些判斷來決定是否能夠寫入數據,這裏也主要介紹關於冪等相關的操做流程。
CLUSTER_AUTHORIZATION_FAILED
。若是有PID且非重複batch,則進行如下操做:
消費者主要是從Kafka集羣拉取消息,而後進行相關的消費邏輯,消費者的消費進度由其自身控制,增長消費的靈活性,好比消費端能夠控制重複消費某些消息或者跳過某些消息進行消費。
多個消費者能夠組成一個消費組,每一個消費者只屬於一個消費組。消費組訂閱主題的每一個分區只會分配給該消費組中的某個消費者處理,不一樣的消費組之間彼此隔離無依賴。同一個消息只會被消費組中的一個消費者消費,若是想要讓同一個消息被多個消費者消費,那麼每一個消費者須要屬於不一樣的消費組,且對應消費組中只有該一個消費者,消費組的引入能夠實現消費的「獨佔」或「廣播」效果。
如圖所示,消費組1中包含兩個消費者,其中消費者1分配消費分區0,消費者2分配消費分區1與分區2。此外消費組的引入還支持消費者的水平擴展及故障轉移,好比從上圖咱們能夠看出消費者2的消費能力不足,相對消費者1來講消費進度比較落後,咱們能夠往消費組裏面增長一個消費者以提升其總體的消費能力,以下圖所示。
假設消費者1所在機器出現宕機,消費組會發送重平衡,假設將分區0分配給消費者2進行消費,以下圖所示。同個消費組中消費者的個數不是越多越好,最大不能超過主題對應的分區數,若是超過則會出現超過的消費者分配不到分區的狀況,由於分區一旦分配給消費者就不會再變更,除非組內消費者個數出現變更而發生重平衡。
Kafka 0.9開始將消費端的位移信息保存在集羣的內部主題(__consumer_offsets)中,該主題默認爲50個分區,每條日誌項的格式都是:<TopicPartition, OffsetAndMetadata>,其key爲主題分區主要存放主題、分區以及消費組信息,value爲OffsetAndMetadata對象主要包括位移、位移提交時間、自定義元數據等信息。只有消費組往kafka中提交位移纔會往這個主題中寫入數據,若是消費端將消費位移信息保存在外部存儲,則不會有消費位移信息,下面能夠經過kafka-console-consumer.sh
腳本查看主題消費位移信息。
\# bin/kafka-console-consumer.sh --topic \_\_consumer\_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning \[consumer-group01,nginx\_access\_log,2\]::OffsetAndMetadata(offset\=17104625, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None) \[consumer-group01,nginx\_access\_log,1\]::OffsetAndMetadata(offset\=17103024, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None) \[consumer-group01,nginx\_access\_log,0\]::OffsetAndMetadata(offset\=17107771, leaderEpoch\=Optional.\[0\], metadata\=, commitTimestamp\=1573475863555, expireTimestamp\=None)
消費端能夠經過設置參數enable.auto.commit
來控制是自動提交仍是手動,若是值爲true
則表示自動提交,在消費端的後臺會定時的提交消費位移信息,時間間隔由auto.commit.interval.ms
(默認爲5秒)。
可是若是設置爲自動提交會存在如下幾個問題:
手動提交須要將enable.auto.commit
值設置爲false
,而後由業務消費端來控制消費進度,手動提交又分爲如下三種類型:
commitSync()
,則會將poll拉取的最新位移提交到kafka集羣,提交成功前會一直等待提交成功。commitAsync()
,在調用該方法以後會馬上返回,不會阻塞,而後能夠經過回調函數執行相關的異常處理邏輯。分組協調者(Group Coordinator)是一個服務,kafka集羣中的每一個節點在啓動時都會啓動這樣一個服務,該服務主要是用來存儲消費分組相關的元數據信息,每一個消費組均會選擇一個協調者來負責組內各個分區的消費位移信息存儲,選擇的主要步驟以下:
partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
其中groupId
爲消費組的id,這個由消費端指定,groupMetadataTopicPartitionCount
爲主題分區數。如下幾種場景均會觸發重平衡操做:
重平衡的實現能夠分爲如下幾個階段:
Group Coordinator
:消費者會從kafka集羣中選擇一個負載最小的節點發送GroupCoorinatorRequest
請求,並處理返回響應GroupCoordinatorResponse
。其中請求參數中包含消費組的id,響應中包含Coordinator所在節點id、host以及端口號信息。Join group
:當消費者拿到協調者的信息以後會往協調者發送加入消費組的請求JoinGroupRequest
,當全部的消費者都發送該請求以後,協調者會從中選擇一個消費者做爲leader角色,而後將組內成員信息、訂閱等信息發給消費者(響應格式JoinGroupResponse
見下表),leader負責消費方案的分配。JoinGroupRequest
請求數據格式
名稱 | 類型 | 說明 |
---|---|---|
group_id | String | 消費者id |
seesion_timeout | int | 協調者超過session_timeout指定的時間沒有收到心跳消息,則認爲該消費者下線 |
member_id | String | 協調者分配給消費者的id |
protocol_type | String | 消費組實現的協議,默認爲sonsumer |
group_protocols | List | 包含此消費者支持的所有PartitionAssignor類型 |
protocol_name | String | PartitionAssignor類型 |
protocol_metadata | byte[] | 針對不一樣PartitionAssignor類型序列化後的消費者訂閱信息,包含用戶自定義數據userData |
JoinGroupResponse
響應數據格式
名稱 | 類型 | 說明 |
---|---|---|
error_code | short | 錯誤碼 |
generation_id | int | 協調者分配的年代信息 |
group_protocol | String | 協調者選擇的PartitionAssignor類型 |
leader_id | String | Leader的member_id |
member_id | String | 協調者分配給消費者的id |
members | Map集合 | 消費組中所有的消費者訂閱信息 |
member_metadata | byte[] | 對應消費者的訂閱信息 |
Synchronizing Group State
階段:當leader消費者完成消費方案的分配後會發送SyncGroupRequest
請求給協調者,其餘非leader節點也會發送該請求,只是請求參數爲空,而後協調者將分配結果做爲響應SyncGroupResponse
發給各個消費者,請求及相應的數據格式以下表所示:SyncGroupRequest
請求數據格式
名稱 | 類型 | 說明 |
---|---|---|
group_id | String | 消費組的id |
generation_id | int | 消費組保存的年代信息 |
member_id | String | 協調者分配的消費者id |
member_assignment | byte[] | 分區分配結果 |
SyncGroupResponse
響應數據格式
名稱 | 類型 | 說明 |
---|---|---|
error_code | short | 錯誤碼 |
member_assignment | byte[] | 分配給當前消費者的分區 |
Kafka提供了三個分區分配策略:RangeAssignor、RoundRobinAssignor以及StickyAssignor,下面簡單介紹下各個算法的實現。
RangeAssignor:kafka默認會採用此策略進行分區分配,主要流程以下
TP={TP0,Tp1,...,TPN+1}
。CG={C0,C1,...,CM+1}
。D=N/M
,R=N%M
。假設一個消費組中存在兩個消費者{C0,C1},該消費組訂閱了三個主題{T1,T2,T3},每一個主題分別存在三個分區,一共就有9個分區{TP1,TP2,...,TP9}。經過以上算法咱們能夠獲得D=4,R=1,那麼消費組C0將消費的分區爲{TP1,TP2,TP3,TP4,TP5},C1將消費分區{TP6,TP7,TP8,TP9}。這裏存在一個問題,若是不能均分,那麼前面的幾個消費者將會多消費一個分區。
RoundRobinAssignor:使用該策略須要知足如下兩個條件:1) 消費組中的全部消費者應該訂閱主題相同;2) 同一個消費組的全部消費者在實例化時給每一個主題指定相同的流數。
在本文中,咱們圍繞Kafka的特性,詳細介紹了其原理實現,經過主題與日誌的深刻剖析,瞭解了Kafka內部消息的存放、檢索以及刪除機制。副本系統中的ISR概念的引入解決同步副本與異步複製兩種方案各自的缺陷,lead epoch機制的出現解決了數據丟失以及數據不一致問題。生產端的分區選擇算法實現了數據均衡,冪等特性的支持則解決了以前存在的重複消息問題。
最後介紹了消費端的相關原理,消費組機制實現了消費端的消息隔離,既有廣播也有獨佔的場景支持,而重平衡機制則保證的消費端的健壯性與擴展性。
[1] 徐郡明.Apach Kafka 源碼剖析[M].北京.電子工業出版社,2017.
[2] Kafka深度解析.
[3] 深刻淺出理解基於 Kafka 和 ZooKeeper 的分佈式消息隊列.
[4] Kafka 事務性之冪等性實現.
[5] Kafka水位(high watermark)與leader epoch的討論.
[6] kafka消費者如何分配分區.