關於Kafka __consumer_offests的討論

 衆所周知,__consumer__offsets是一個內部topic,對用戶而言是透明的,除了它的數據文件以及偶爾在日誌中出現這兩點以外,用戶通常是感受不到這個topic的。不過咱們的確知道它保存的是Kafka新版本consumer的位移信息。本文咱們簡單梳理一下這個內部topic(以1.0.0代碼爲分析對象)java

1、什麼時候被建立?json

首先,咱們先來看下 它是什麼時候被建立的?__consumer_offsets建立的時機有不少種,主要包括:數組

  • broker響應FindCoordinatorRequest請求時
  • broker響應MetadataRequest顯式請求__consumer_offsets元數據時

其中以第一種最爲常見,而第一種時機的表現形式可能有不少,好比用戶啓動了一個消費者組(下稱consumer group)進行消費或調用kafka-consumer-groups --describe等spa

2、消息種類線程

__consumer_offsets中保存的記錄是普通的Kafka消息,只是它的格式徹底由Kafka來維護,用戶不能干預。嚴格來講,__consumer_offsets中保存三類消息,分別是:3d

  • Consumer group組元數據消息
  • Consumer group位移消息
  • Tombstone消息

2.1 Consumer group組元數據消息日誌

咱們都知道__consumer_offsets是保存位移的,但實際上每一個消費者組的元數據信息也保存在這個topic。這些元數據包括:orm

這裏不詳細展開組元數據各個字段的含義。咱們只須要知道組元數據消息也是保存在__consumer_offsets中便可。值得一提的是, 若是用戶使用standalone consumer(即consumer.assign(****)方法),那麼就不會寫入這類消息,畢竟咱們使用的是獨立的消費者,而沒有使用消費者組。server

這類消息的key是一個二元組,格式是【版本+groupId】,這裏的版本表徵這類消息的版本號,無實際用途;而value就是上圖全部這些信息打包而成的字節數組。對象

2.2 Consumer group組位移提交消息

若是隻容許說出__consumer_offsets的一個功能,那麼咱們就記住這個好了:__consumer_offsets保存consumer提交到Kafka的位移數據。這句話有兩個要點:1. 只有當consumer group向Kafka提交位移時纔會向__consumer_offsets寫入這類消息。若是你的consumer壓根就不提交位移,或者你將位移保存到了外部存儲中(好比Apache Flink的檢查點機制或老版本的Storm Kafka Spout),那麼__consumer_offsets中就是無位移數據;2. 這句話中的consumer既包含consumer group也包含standalone consumer。也就是說,只要你向Kafka提交位移,不論使用哪一種java consumer,它都是向__consumer_offsets寫消息。

這類消息的key是一個三元組,格式是【groupId + topic + 分區號】,value則是要提交的位移信息,以下圖所示:

位移就是待提交的位移,提交時間是提交位移時的時間戳,而過時時間則是用戶指定的過時時間。因爲目前consumer代碼在提交位移時並無明確指定過時間隔,故broker端默認設置過時時間爲提交時間+offsets.retention.minutes參數值,即提交1天以後自動過時。Kafka會按期掃描__consumer_offsets中的位移消息並刪除掉那些過時的位移數據。

上圖中還有個「自定義元數據」,實際上consumer容許用戶在提交位移時指定一些特殊的自定義信息。咱們不對此進行詳細展開,由於java consumer根本就沒有使用到它。相反地,Kafka Streams利用該字段來完成某些定製任務。

2.3 tombstone消息或Delete Mark消息

 第三類消息成爲tombstone消息或delete mark消息。這類消息只出如今源碼中而不暴露給用戶。它和第一類消息很像,key都是二元組【版本+groupId】,惟一的區別在於這類消息的消息體是null,即空消息體。什麼時候寫入這類消息?前面說過了,Kafka會按期掃描過時位移消息並刪除之。一旦某個consumer group下已沒有任何active成員且全部的位移數據都已被刪除時,Kafka會將該group的狀態置爲Dead並向__consumer__offsets對應分區寫入tombstone消息,代表要完全刪除這個group的信息。簡單來講,這類消息就是用於完全刪除group信息的。

3、什麼時候寫入?

第一類消息是在組rebalance時寫入的;第二類消息是在提交位移時寫入的;第三類消息是在Kafka後臺線程掃描並刪除過時位移或者__consumer_offsets分區副本重分配時寫入的。

4、消息留存策略

__consumer_offsets目前的留存策略是compact,__consumer_offsets會按期對消息內容進行compact操做——用戶也能夠同時啓用兩種留存策略來減小該topic所佔的磁盤空間,不過要承擔可能丟失位移數據的風險。

5、副本因子

__consumer_offest不受server.properties中num.partitions和default.replication.factor參數的制約。相反地,它的分區數和備份因子分別由offsets.topic.num.partitions和offsets.topic.replication.factor參數決定。這兩個參數的默認值分別是50和1,表示該topic有50個分區,副本因子是1。鑑於位移和group元數據等信息都保存在該topic中,實際使用過程當中不少用戶都會將offsets.topic.replication.factor設置成大於1的數以增長可靠性,這是推薦的作法。不過在0.11.0.0以前,這個設置是有缺陷的:假設你設置了offsets.topic.replication.factor = 3,只要Kafka建立該topic時可用broker數<3,那麼建立出來的__consumer_offsets的備份因子就是2。也就是說Kafka並無尊重咱們設置的offsets.topic.replication.factor參數。好在這個問題在0.11.0.0版本獲得瞭解決,如今用戶在使用時,一旦須要建立__consumer_offsets了Kafka必定會保證湊齊足量的broker纔會開始建立,不然就拋出異常給用戶。

平常使用中,另外一個常見的問題是如何擴展該topic的副本因子。因爲它依然是一個Kafka topic,所以咱們能夠調用bin/kafka-reassign-partitions.sh(bat)腳原本擴展replication factor。作法以下:

1. 構造一個json文件,以下所示,其中1,2,3表示3臺broker的ID

{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[2,3,1]}
]}

2. 運行bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassign.json --execute

若是一切正常,你會發現__consumer_offsets的replication factor已然被擴展爲3。

6、如何刪除group信息

首先明確一點,Kafka是會刪除consumer group信息的,既包括位移信息,也包括組元數據信息。對於位移信息而言,前面提到過每條位移消息都設置了過時時間。每一個Kafka broker在後臺會啓動一個線程,按期(由offsets.retention.check.interval.ms肯定,默認10分鐘)掃描過時位移,並刪除之。而對組元數據而言,刪除它們的條件有兩個:1. 這個group下不能存在active成員,即全部成員都已經退出了group;2. 這個group的全部位移信息都已經被刪除了。當知足了這兩個條件後,Kafka後臺線程會刪除group運輸局信息。

好了, 咱們總說刪除,那麼Kafka究竟是怎麼刪除的呢——正是經過寫入具備相同key的tombstone消息。咱們舉個例子,假設__consumer_offsets當前保存有一條位移消息,key是【testGroupid,test, 0】(三元組),value是待提交的位移信息。不管什麼時候,只要咱們向__consumer_offsets相同分區寫入一條key=【testGroupid,test, 0】,value=null的消息,那麼Kafka就會認爲以前的那條位移信息是能夠刪除的了——即至關於咱們向__consumer_offsets中插入了一個delete mark。

再次強調一下,向__consumer_offsets寫入tombstone消息僅僅是標記它以前的具備相同key的消息是能夠被刪除的,但刪除操做一般不會當即開始。真正的刪除操做是由log cleaner的Cleaner線程來執行的。

 

鑑於目前水平有限,能想到的就這麼多。有相關問題的讀者能夠將問題發動評論區,若是具備較大的共性,我會添加到本文的末尾~~

相關文章
相關標籤/搜索