Kafka使用基於TCP的二進制協議,該協議定義了全部API的請求及響應消息。全部的消息都是經過長度來分隔,而且由後面描述的基本類型組成。html
RequestOrResponse --> Size( RequestMessage | ResponseMessage ) Size --> int32
Size給出了後續請求或響應消息的字節長度。
解析時應先讀取4字節的長度N,而後讀取並解析後續的N字節請求/響應內容。node
RequestMessage --> request_header request request_header --> api_key api_version correlation_id client_id api_key --> int16 api_version --> int16 correlation_id -->int32 client_id --> string request --> MetadataRequest | ProduceRequest | FetchRequest | ...
api_key: 這是一個表示所調用的API的數字ID(即不一樣類型的請求有不一樣的ID)
api_version: 這是該API的一個數字版本號,Kafka爲每一個API定義一個版本號,該版本號容許服務器根據版本號正確地解釋請求內容。
correlation_id: 這是一個用戶提供的整數。它將被服務器原封不動的傳回給客戶端。用於匹配客戶端和服務器之間的請求和響應。
client_id: 這是客戶端應用程序自定義的標識
request: 具體的請求內容,例如元數據請求、生產請求、獲取消息請求、偏移量請求等。apache
ResponseMessage --> response_header response response_header --> correlation_id correlation_id --> int32 response --> MetadataResponse | ProduceResponse | ...
correlation_id: 即請求中攜帶的correlation_id
response: 具體的響應內容api
MessageSet --> [Offset MessageSize Message] Offset --> int64 MessageSize --> int32 Message --> Crc MagicByte Attributes Key Value Crc --> int32 MagicByte --> int8 Attributes --> int8 Key --> bytes Value --> bytes
Offset: 這是在Kafka中做爲日誌序列號使用的偏移量。
Crc:Crc是的剩餘消息字節的CRC32值。broker和消費者可用來檢查信息的完整性。
MagicByte: 這是一個用於容許消息二進制格式的向後兼容演化的版本id。當前值是0。 Attributes: 這個字節保存有關信息的元數據屬性。
最低的3位包含用於消息的壓縮編解碼器。
第四位表示時間戳類型,0表明CreateTime,1表明LogAppendTime。生產者必須把這個位設成0。
全部其餘位必須被設置爲0。
Timestamp: 消息的時間戳。時間戳類型在Attributes域中體現。單位爲從UTC標準準時間1970年1月1日0點到所在時間的毫秒數。
Key: Key是一個可選項,它主要用來進行指派分區。Key能夠爲null。
Value: Value是消息的實際內容,類型是字節數組。
注:此格式爲V0版本的格式,V1詳見官方文檔,一樣後續具體請求格式均以V0版本進行說明數組
SaslHandshake Request --> mechanism mechanism --> string
mechanism: 客戶端選擇的SASL機制服務器
SaslHandshake Response --> error_code [enabled_mechanisms] error_code --> int16 enabled_mechanisms --> string
error_code: 錯誤碼
enabled_mechanisms: 服務端使能的SASL mechanisms列表ide
Metadata Request --> [topics] topics --> string
topics: 要獲取元數據的topic數組。 若是爲空,則返回全部topic的元數據。性能
Metadata Response --> [brokers] [topic_metadata] brokers --> node_id host port node_id --> int32 host --> string port --> int32 topic_metadata --> error_code topic [partition_metadata] error_code --> int16 topic --> string partition_metadata --> error_code partition leader [replicas] [isr] error_code --> int16 partition --> int32 leader --> int32 replicas --> int32 isr --> int32
node_id: broker的id, 即kafka節點的ID
host: broker的主機名
port: broker偵聽端口
partition: topic對應partition的id
leader: 該topic全部partition中,扮演leader角色的partition所在的broker的id
replicas: 全部partition中,做爲slave的節點集合
isr: 副本集合中,與leader處於跟隨狀態的節點集合fetch
ProduceRequest --> asks timeout [topic [partition messageSetSize MessageSet]] acks --> int16 timeout --> int32 topic --> string partition --> int32 messageSetSize --> int32
acks: 這個值表示服務端收到多少確認後才發送反饋消息給客戶端,若是設置爲0,即服務端不發送response;若是設置爲1,即服務端將等到數據寫入本地日誌後發送response;若是設置爲-1,服務端將阻塞,直到這個消息被全部的同步副本寫入後再發送response
timeout: 這個值提供了以毫秒爲單位的超時時間
topic:該數據將會發布到的topic名稱
partition:該數據將會發布到的分區
messageSetSize: 後續消息集的長度,單位是字節
messageSet:前面描述的標準格式的消息集合ui
ProduceResponse --> [responses] responses --> topic [partition_responses] topic --> string partition_responses --> partition error_code base_offset partition --> int32 error_code --> int16 base_offset --> int64
topic: 此響應對應的topic名稱
partition: topic對應的partition的id
base_offset: produce的消息在partition中的偏移
Fetch Request --> replica_id max_wait_time min_bytes [topics] replica_id --> int32 max_wait_time --> int32 min_bytes --> int32 topics --> topic [partitions] partitions --> partition fetch_offset max_bytes partition --> int32 fetch_offset --> int64 max_bytes --> int32
replica_id: 發起這個請求的副本節點ID
max_wait_time: 若是沒有足夠的數據可發送時,最大阻塞等待時間,以毫秒爲單位
min_bytes: 返回響應消息的最小字節數目,必須設置。若是客戶端將此值設爲0,服務器將會當即返回,但若是沒有新的數據,服務端會返回一個空消息集。若是它被設置爲1,則服務器將在至少一個分區收到一個字節的數據的狀況下當即返回,或者等到超時時間達到。經過設置較高的值,結合超時設置,消費者能夠在犧牲一點實時性能的狀況下經過一次讀取較大的字節的數據塊從而提升的吞吐量
topic: topic的名稱
partition: topic對應partition的id
fetch_offset: 獲取數據的起始偏移量
max_bytes: 此分區返回消息集所能包含的最大字節數。
Fetch Response --> [responses] responses --> topic [partition_responses] topic --> string partition_responses --> partition_header messageSet partition_header --> partition error_code high_watermark parttion --> int32 error_code --> int16 high_watermark --> int64
high_watermark: 該partition分區中最後提交的消息的偏移量。此信息可被客戶端用來肯定後面還有多少條消息。
ListOffsets Request --> replica_id [topics] replica_id --> int32 topics --> topic [partitions] topic --> string partitions --> partition timestamp max_num_offsets partition --> int32 timestamp --> int64 max_num_offsets --> int32
timestamp: 用來請求必定時間(單位:毫秒)前的全部消息
-1表示獲取最新的offset,即下一個produce的消息的offset
-2表示獲取最先的有效偏移量
ListOffsets Response --> [responses] responses --> topic [partition_responses] topic --> string partition_responses -->partition error_code [offsets] partition --> int32 error_code --> int16 offsets --> int64