一文看懂 Kafka 消息格式的演變

摘要

對於一個成熟的消息中間件而言,消息格式不只關係到功能維度的擴展,還牽涉到性能維度的優化。隨着Kafka的迅猛發展,其消息格式也在不斷的升級改進,從0.8.x版本開始到如今的1.1.x版本,Kafka的消息格式也經歷了3個版本。本文這裏主要來說述Kafka的三個版本的消息格式的演變,文章偏長,建議先關注後鑑定。node


Kafka根據topic(主題)對消息進行分類,發佈到Kafka集羣的每條消息都須要指定一個topic,每一個topic將被分爲多個partition(分區)。每一個partition在存儲層面是追加log(日誌)文件,任何發佈到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱爲offset(偏移量),offset爲一個long型的數值,它惟一標記一條消息。 算法

每一條消息被髮送到Kafka中,其會根據必定的規則選擇被存儲到哪個partition中。若是規則設置的合理,全部的消息能夠均勻分佈到不一樣的partition裏,這樣就實現了水平擴展。如上圖,每一個partition由其上附着的每一條消息組成,若是消息格式設計的不夠精煉,那麼其功能和性能都會大打折扣。好比有冗餘字段,勢必會使得partition沒必要要的增大,進而不只使得存儲的開銷變大、網絡傳輸的開銷變大,也會使得Kafka的性能降低;又好比缺乏字段,在最初的Kafka消息版本中沒有timestamp字段,對內部而言,其影響了日誌保存、切分策略,對外部而言,其影響了消息審計、端到端延遲等功能的擴展,雖然能夠在消息體內部添加一個時間戳,可是解析變長的消息體會帶來額外的開銷,而存儲在消息體(參考下圖中的value字段)前面能夠經過指針偏量獲取其值而容易解析,進而減小了開銷(能夠查看v1版本),雖然相比於沒有timestamp字段的開銷會差一點。如此分析,僅在一個字段的一增一減之間就有這麼多門道,那麼Kafka具體是怎麼作的呢?本文只針對Kafka 0.8.x版本開始作相應說明,對於以前的版本不作陳述。bash

v0版本

對於Kafka消息格式的第一個版本,咱們把它稱之爲v0,在Kafka 0.10.0版本以前都是採用的這個消息格式。注意如無特殊說明,咱們只討論消息未壓縮的情形。 服務器

上左圖中的「RECORD」部分就是v0版本的消息格式,大多數人會把左圖中的總體,即包括offset和message size字段都都當作是消息,由於每一個Record(v0和v1版)一定對應一個offset和message size。每條消息都一個offset用來標誌它在partition中的偏移量,這個offset是邏輯值,而非實際物理偏移值,message size表示消息的大小,這二者的一塊兒被稱之爲日誌頭部(LOG_OVERHEAD),固定爲12B。LOG_OVERHEAD和RECORD一塊兒用來描述一條消息。與消息對應的還有消息集的概念,消息集中包含一條或者多條消息,消息集不只是存儲於磁盤以及在網絡上傳輸(Produce & Fetch)的基本形式,並且是kafka中壓縮的基本單元,詳細結構參考上右圖。網絡

下面來具體陳述一下消息(Record)格式中的各個字段,從crc32開始算起,各個字段的解釋以下:app

  • 1. crc32(4B):crc32校驗值。校驗範圍爲magic至value之間。
  • 2. magic(1B):消息格式版本號,此版本的magic值爲0。
  • 3. attributes(1B):消息的屬性。總共佔1個字節,低3位表示壓縮類型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其他位保留。
  • 4. key length(4B):表示消息的key的長度。若是爲-1,則表示沒有設置key,即key=null。
  • 5. key:可選,若是沒有key則無此字段。
  • 6. value length(4B):實際消息體的長度。若是爲-1,則表示消息爲空。
  • 7. value:消息體。能夠爲空,好比tomnstone消息。

v0版本中一個消息的最小長度(RECORD_OVERHEAD_V0)爲crc32 + magic + attributes + key length + value length = 4B + 1B + 1B + 4B + 4B =14B,也就是說v0版本中一條消息的最小長度爲14B,若是小於這個值,那麼這就是一條破損的消息而不被接受。工具

這裏咱們來作一個測試,首先建立一個partition數和副本數都爲1的topic,名稱爲「msg_format_v0」,而後往msg_format_v0中發送一條key=」key」,value=」value」的消息,以後查看對應的日誌:性能

[root@node1 kafka_2.10-0.8.2.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /tmp/kafka-logs-08/msg_format_v0-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 5 magic: 0 compresscodec: NoCompressionCodec crc: 592888119 keysize: 3
複製代碼

查看消息的大小,即00000000000000000000.log文件的大小爲34B,其值正好等於LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 3B的key + 5B的value = 12B + 14B + 3B + 5B = 34B。測試

[root@node1 msg_format_v0-0]# ll *.log
-rw-r--r-- 1 root root       34 Apr 26 02:52 00000000000000000000.log
複製代碼

咱們再發送一條key=null, value=」value」的消息,以後查看日誌的大小:優化

[root@node3 msg_format_v0-0]# ll *.log
-rw-r--r-- 1 root root       65 Apr 26 02:56 00000000000000000000.log
複製代碼

日誌大小爲65B,減去上一條34B的消息,能夠得知本條消息的大小爲31B,正好等於LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 5B的value = 12B + 14B+ 5B = 31B。

v1版本

kafka從0.10.0版本開始到0.11.0版本以前所使用的消息格式版本爲v1,其比v0版本就多了一個timestamp字段,表示消息的時間戳。v1版本的消息結構圖以下所示:

v1版本的magic字段值爲1。v1版本的attributes字段中的低3位和v0版本的同樣,仍是表示壓縮類型,而第4個bit也被利用了起來:0表示timestamp類型爲CreateTime,而1表示tImestamp類型爲LogAppendTime,其餘位保留。v1版本的最小消息(RECORD_OVERHEAD_V1)大小要比v0版本的要大8個字節,即22B。若是像v0版本介紹的同樣發送一條key=」key」,value=」value」的消息,那麼此條消息在v1版本中會佔用42B,具體測試步驟參考v0版的相關介紹。

消息壓縮

常見的壓縮算法是數據量越大壓縮效果越好,一條消息一般不會太大,這就致使壓縮效果並不太好。而kafka實現的壓縮方式是將多條消息一塊兒進行壓縮,這樣能夠保證較好的壓縮效果。並且在通常狀況下,生產者發送的壓縮數據在kafka broker中也是保持壓縮狀態進行存儲,消費者從服務端獲取也是壓縮的消息,消費者在處理消息以前纔會解壓消息,這樣保持了端到端的壓縮。

壓縮率是壓縮後的大小與壓縮前的對比。例如:把100MB的文件壓縮後是90MB,壓縮率爲90/100*100%=90%,壓縮率通常是越小壓縮效果越好。通常口語化陳述時會誤描述爲壓縮率越高越好,爲了不混淆,本文不引入學術上的壓縮率而引入壓縮效果,這樣容易達成共識。

講解到這裏都是針對消息未壓縮的狀況,而當消息壓縮時是將整個消息集進行壓縮而做爲內層消息(inner message),內層消息總體做爲外層(wrapper message)的value,其結構圖以下所示:

壓縮後的外層消息(wrapper message)中的key爲null,因此圖右部分沒有畫出key這一部分。當生產者建立壓縮消息的時候,對內部壓縮消息設置的offset是從0開始爲每一個內部消息分配offset,詳細能夠參考下圖右部:

其實每一個從生產者發出的消息集中的消息offset都是從0開始的,固然這個offset不能直接存儲在日誌文件中,對offset進行轉換時在服務端進行的,客戶端不須要作這個工做。外層消息保存了內層消息中最後一條消息的絕對位移(absolute offset),絕對位移是指相對於整個partition而言的。參考上圖,對於未壓縮的情形,圖右內層消息最後一條的offset理應是1030,可是被壓縮以後就變成了5,而這個1030被賦予給了外層的offset。當消費者消費這個消息集的時候,首先解壓縮整個消息集,而後找到內層消息中最後一條消息的inner offset,而後根據以下公式找到內層消息中最後一條消息前面的消息的absolute offset(RO表示Relative Offset,IO表示Inner Offset,而AO表示Absolute Offset):

RO = IO_of_a_message - IO_of_the_last_message
AO = AO_Of_Last_Inner_Message + RO
複製代碼

注意這裏RO是前面的消息相對於最後一條消息的IO而言的,因此其值小於等於0,0表示最後一條消息自身。

壓縮消息,英文是compress message,Kafka中還有一個compact message,經常也會被人們直譯成壓縮消息,須要注意二者的區別。compact message是針對日誌清理策略而言的(cleanup.policy=compact),是指日誌壓縮(log compaction)後的消息,這個後續的系列文章中會有介紹。本文中的壓縮消息單指compress message,即採用GZIP、LZ4等壓縮工具壓縮的消息。

在講述v1版本的消息時,咱們瞭解到v1版本比v0版的消息多了個timestamp的字段。對於壓縮的情形,外層消息的timestamp設置爲:

  1. 若是timestamp類型是CreateTime,那麼設置的是內層消息中最大的時間戳(the max timestampof inner messages if CreateTime is used)。
  2. 若是timestamp類型是LogAppendTime,那麼設置的是kafka服務器當前的時間戳;

內層消息的timestamp設置爲:

  1. 若是外層消息的timestamp類型是CreateTime,那麼設置的是生產者建立消息時的時間戳。
  2. 若是外層消息的timestamp類型是LogAppendTime,那麼全部的內層消息的時間戳都將被忽略。

對於attributes字段而言,它的timestamp位只在外層消息(wrapper message)中設置,內層消息(inner message)中的timestamp類型一直都是CreateTime。

v2版本

kafka從0.11.0版本開始所使用的消息格式版本爲v2,這個版本的消息相比於v0和v1的版本而言改動很大,同時還參考了Protocol Buffer而引入了變長整型(Varints)和ZigZag編碼。Varints是使用一個或多個字節來序列化整數的一種方法,數值越小,其所佔用的字節數就越少。ZigZag編碼以一種鋸齒形(zig-zags)的方式來回穿梭於正負整數之間,以使得帶符號整數映射爲無符號整數,這樣可使得絕對值較小的負數仍然享有較小的Varints編碼值,好比-1編碼爲1,1編碼爲2,-2編碼爲3。詳細能夠參考:developers.google.com/protocol-bu…

回顧一下kafka v0和v1版本的消息格式,若是消息自己沒有key,那麼key length字段爲-1,int類型的須要4個字節來保存,而若是採用Varints來編碼則只須要一個字節。根據Varints的規則能夠推導出0-63之間的數字佔1個字節,64-8191之間的數字佔2個字節,8192-1048575之間的數字佔3個字節。而kafka broker的配置message.max.bytes的默認大小爲1000012(Varints編碼佔3個字節),若是消息格式中與長度有關的字段採用Varints的編碼的話,絕大多數狀況下都會節省空間,而v2版本的消息格式也正是這樣作的。不過須要注意的是Varints並不是一直會省空間,一個int32最長會佔用5個字節(大於默認的4字節),一個int64最長會佔用10字節(大於默認的8字節)。

v2版本中消息集謂之爲Record Batch,而不是先前的Message Set了,其內部也包含了一條或者多條消息,消息的格式參見下圖中部和右部。在消息壓縮的情形下,Record Batch Header部分(參見下圖左部,從first offset到records count字段)是不被壓縮的,而被壓縮的是records字段中的全部內容。

先來說述一下消息格式Record的關鍵字段,能夠看到內部字段大量採用了Varints,這樣Kafka能夠根據具體的值來肯定須要幾個字節來保存。v2版本的消息格式去掉了crc字段,另外增長了length(消息總長度)、timestamp delta(時間戳增量)、offset delta(位移增量)和headers信息,而且attributes被棄用了,筆者對此作以下分析(對於key、key length、value、value length字段和v0以及v1版本的同樣,這裏再也不贅述):

1. length:消息總長度。 2. attributes:棄用,可是仍是在消息格式中佔據1B的大小,以備將來的格式擴展。 3. timestamp delta:時間戳增量。一般一個timestamp須要佔用8個字節,若是像這裏保存與RecordBatch的其實時間戳的差值的話能夠進一步的節省佔用的字節數。 4. offset delta:位移增量。保存與RecordBatch起始位移的差值,能夠節省佔用的字節數。 5. headers:這個字段用來支持應用級別的擴展,而不須要像v0和v1版本同樣不得不將一些應用級別的屬性值嵌入在消息體裏面。Header的格式如上圖最有,包含key和value,一個Record裏面能夠包含0至多個Header。

若是對於v1版本的消息,若是用戶指定的timestamp類型是LogAppendTime而不是CreateTime,那麼消息從發送端(Producer)進入broker端以後timestamp字段會被更新,那麼此時消息的crc值將會被從新計算,而此值在Producer端已經被計算過一次;再者,broker端在進行消息格式轉換時(好比v1版轉成v0版的消息格式)也會從新計算crc的值。在這些相似的狀況下,消息從發送端到消費端(Consumer)之間流動時,crc的值是變更的,須要計算兩次crc的值,因此這個字段的設計在v0和v1版本中顯得比較雞肋。在v2版本中將crc的字段從Record中轉移到了RecordBatch中。

v2版本對於消息集(RecordBatch)作了完全的修改,參考上圖左部,除了剛剛說起的crc字段,還多了以下字段:

1. first offset:表示當前RecordBatch的起始位移。 2. length:計算partition leader epoch到headers之間的長度。 3. partition leader epoch:用來確保數據可靠性。 4. magic:消息格式的版本號,對於v2版本而言,magic等於2。 5. attributes:消息屬性,注意這裏佔用了兩個字節。低3位表示壓縮格式,能夠參考v0和v1;第4位表示時間戳類型;第5位表示此RecordBatch是否處於事務中,0表示非事務,1表示事務。第6位表示是不是Control消息,0表示非Control消息,而1表示是Control消息,Control消息用來支持事務功能。 6. last offset delta:RecordBatch中最後一個Record的offset與first offset的差值。主要被broker用來確認RecordBatch中Records的組裝正確性。 7. first timestamp:RecordBatch中第一條Record的時間戳。 8. max timestamp:RecordBatch中最大的時間戳,通常狀況下是指最後一個Record的時間戳,和last offset delta的做用同樣,用來確保消息組裝的正確性。 9. producer id:用來支持冪等性。 10. producer epoch:和producer id同樣,用來支持冪等性。 11. first sequence:和producer id、producer epoch同樣,用來支持冪等性。 12. records count:RecordBatch中Record的個數。

這裏咱們再來作一個測試,在1.0.0的kafka中建立一個partition數和副本數都爲1的topic,名稱爲「msg_format_v2」。而後一樣插入一條key=」key」,value=」value」的消息,查看日誌結果以下:

[root@node1 kafka_2.12-1.0.0]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0 CreateTime: 1524709879130 isvalid: true size: 76 magic: 2 compresscodec: NONE crc: 2857248333
複製代碼

能夠看到size字段爲76,咱們根據上圖中的v2版本的日誌格式來驗證一下,Record Batch Header部分共61B。Record部分中attributes佔1B;timestamp delta值爲0,佔1B;offset delta值爲0,佔1B;key length值爲3,佔1B,key佔3B;value length值爲5,佔1B,value佔5B;headers count值爲0,佔1B, 無headers。Record部分的總長度=1B+1B+1B+1B+3B+1B+5B+1B=14B,因此Record的length字段值爲14,編碼爲變長整型佔1B。最後推到出這條消息的佔用字節數=61B+14B+1B=76B,符合測試結果。一樣再發一條key=null,value=」value」的消息的話,能夠計算出這條消息佔73B。

這麼看上去好像v2版本的消息比以前版本的消息佔用空間要大不少,的確對於單條消息而言是這樣的,若是咱們連續往msg_format_v2中再發送10條value長度爲6,key爲null的消息,能夠獲得:

baseOffset: 2 lastOffset: 11 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 149 CreateTime: 1524712213771 isvalid: true size: 191 magic: 2 compresscodec: NONE crc: 820363253
複製代碼

原本應該佔用740B大小的空間,實際上只佔用了191B,若是在v0版本中這10條消息則須要佔用320B的空間,v1版本則須要佔用400B的空間,這樣看來v2版本又節省了不少的空間,由於其將多個消息(Record)打包存放到單個RecordBatch中,又經過Varints編碼極大的節省了空間。

就以v1和v2版本對比而立,至於哪一個消息格式佔用空間大是不肯定的,要根據具體狀況具體分析。好比每條消息的大小爲16KB,那麼一個消息集中只能包含有一條消息(參數batch.size默認大小爲16384),因此v1版本的消息集大小爲12B+22B+16384B=16418B。而對於v2版本而言,其消息集大小爲61B+11B+16384B=17086B(length值爲16384+,佔用3B,value length值爲16384,佔用大小爲3B,其他數值型的字段均可以只佔用1B的空間)。能夠看到v1版本又會比v2版本節省些許空間。

其實能夠思考一下:當消息體越小,v2版本中的Record字段的佔用會比v1版本的LogHeader+Record佔用越小,以致於某個臨界點能夠徹底忽略到v2版本中Record Batch Header的61B大小的影響。就算消息體很大,v2版本的空間佔用也不會比v1版本的空間佔用大太多,幾十個字節內,反觀對於這種大消息體的大小而言,這幾十個字節的大小從某種程度上又能夠忽略。

因而可知,v2版本的消息不只提供了相似事務、冪等等更多的功能,還對空間佔用提供了足夠的優化,整體提高很大。也由此體現一個優秀的設計是多麼的重要,雖說咱們不要過分的設計和優化,那麼是否能夠着眼於前來思考一下?kafka爲咱們作了一個很好的榜樣。

相關文章
相關標籤/搜索