kafka通訊協議(一)

簡介

Kafka使用基於TCP的二進制協議,該協議定義了全部API的請求及響應消息。全部的消息都是經過長度來分隔,而且由後面描述的基本類型組成。html

基本數據類型

  • 定長基本類型
    int8/int16/int32/int64這些都是不一樣精度的帶符號整數,以大端(big endian)方式存儲。
  • 變長基本類型
    這些類型由一個表示長度的帶符號整數N以及後續N字節的內容組成,長度若是爲-1則表示空。
    string使用int16表示長度,bytes使用int32表示長度。
  • 數組
    這個類型用來處理重複的結構體數據,它們老是由一個表明元素個數int32的整數N,以及後續N個重複結構體組成。
    這些結構體自身是由其餘的基本數據類型組成。

通用的請求和響應格式

  • 全部請求和響應都遵循如下語法基礎:
RequestOrResponse --> Size( RequestMessage | ResponseMessage )
    Size --> int32

Size給出了後續請求或響應消息的字節長度。
解析時應先讀取4字節的長度N,而後讀取並解析後續的N字節請求/響應內容。node

  • 全部請求都具備如下格式:
RequestMessage --> request_header request
    request_header --> api_key api_version correlation_id client_id
        api_key --> int16
        api_version --> int16
        correlation_id -->int32
        client_id --> string
    request --> MetadataRequest | ProduceRequest | FetchRequest | ...

api_key: 這是一個表示所調用的API的數字ID(即不一樣類型的請求有不一樣的ID)
api_version: 這是該API的一個數字版本號,Kafka爲每一個API定義一個版本號,該版本號容許服務器根據版本號正確地解釋請求內容。
correlation_id: 這是一個用戶提供的整數。它將被服務器原封不動的傳回給客戶端。用於匹配客戶端和服務器之間的請求和響應。
client_id: 這是客戶端應用程序自定義的標識
request: 具體的請求內容,例如元數據請求、生產請求、獲取消息請求、偏移量請求等。apache

  • 全部響應都具備如下格式:
ResponseMessage --> response_header response
    response_header --> correlation_id
        correlation_id --> int32
    response --> MetadataResponse | ProduceResponse | ...

correlation_id: 即請求中攜帶的correlation_id
response: 具體的響應內容api

  • 消息集(MessageSet)格式
MessageSet --> [Offset MessageSize Message]
    Offset --> int64
    MessageSize --> int32
    Message --> Crc MagicByte Attributes Key Value
        Crc --> int32
        MagicByte --> int8
        Attributes --> int8
        Key --> bytes
        Value --> bytes

Offset: 這是在Kafka中做爲日誌序列號使用的偏移量。
Crc:Crc是的剩餘消息字節的CRC32值。broker和消費者可用來檢查信息的完整性。
MagicByte: 這是一個用於容許消息二進制格式的向後兼容演化的版本id。當前值是0。 Attributes: 這個字節保存有關信息的元數據屬性。
最低的3位包含用於消息的壓縮編解碼器。
第四位表示時間戳類型,0表明CreateTime,1表明LogAppendTime。生產者必須把這個位設成0。
全部其餘位必須被設置爲0。
Timestamp: 消息的時間戳。時間戳類型在Attributes域中體現。單位爲從UTC標準準時間1970年1月1日0點到所在時間的毫秒數。
Key: Key是一個可選項,它主要用來進行指派分區。Key能夠爲null。
Value: Value是消息的實際內容,類型是字節數組。
注:此格式爲V0版本的格式,V1詳見官方文檔,一樣後續具體請求格式均以V0版本進行說明數組

部分請求響應的具體格式

  • SASL握手請求響應(api_key: 17)
SaslHandshake Request --> mechanism
    mechanism --> string

mechanism: 客戶端選擇的SASL機制服務器

SaslHandshake Response --> error_code [enabled_mechanisms]
    error_code --> int16
    enabled_mechanisms --> string

error_code: 錯誤碼
enabled_mechanisms: 服務端使能的SASL mechanisms列表ide

  • 元數據請求響應(api_key: 3)
Metadata Request --> [topics]
    topics --> string

topics: 要獲取元數據的topic數組。 若是爲空,則返回全部topic的元數據。性能

Metadata Response --> [brokers] [topic_metadata]
    brokers --> node_id host port
        node_id --> int32
        host --> string
        port --> int32
    topic_metadata --> error_code topic [partition_metadata] 
        error_code --> int16
        topic --> string
        partition_metadata --> error_code partition leader [replicas] [isr]
            error_code --> int16
            partition --> int32
            leader --> int32
            replicas --> int32
            isr --> int32

node_id: broker的id, 即kafka節點的ID
host: broker的主機名
port: broker偵聽端口
partition: topic對應partition的id
leader: 該topic全部partition中,扮演leader角色的partition所在的broker的id
replicas: 全部partition中,做爲slave的節點集合
isr: 副本集合中,與leader處於跟隨狀態的節點集合fetch

  • 生產請求響應(api_key: 0)
ProduceRequest --> asks timeout [topic [partition messageSetSize MessageSet]]
    acks --> int16
    timeout --> int32
    topic --> string
    partition --> int32
    messageSetSize --> int32

acks: 這個值表示服務端收到多少確認後才發送反饋消息給客戶端,若是設置爲0,即服務端不發送response;若是設置爲1,即服務端將等到數據寫入本地日誌後發送response;若是設置爲-1,服務端將阻塞,直到這個消息被全部的同步副本寫入後再發送response
timeout: 這個值提供了以毫秒爲單位的超時時間
topic:該數據將會發布到的topic名稱
partition:該數據將會發布到的分區
messageSetSize: 後續消息集的長度,單位是字節
messageSet:前面描述的標準格式的消息集合ui

ProduceResponse --> [responses]
    responses --> topic [partition_responses]
        topic --> string
        partition_responses --> partition error_code base_offset
            partition --> int32
            error_code --> int16
            base_offset --> int64

topic: 此響應對應的topic名稱
partition: topic對應的partition的id
base_offset: produce的消息在partition中的偏移

  • 獲取消息請求響應(api_key: 1)
Fetch Request --> replica_id max_wait_time min_bytes [topics]
    replica_id --> int32
    max_wait_time --> int32
    min_bytes --> int32
    topics --> topic [partitions]
        partitions --> partition fetch_offset max_bytes
            partition --> int32
            fetch_offset --> int64
            max_bytes --> int32

replica_id: 發起這個請求的副本節點ID
max_wait_time: 若是沒有足夠的數據可發送時,最大阻塞等待時間,以毫秒爲單位
min_bytes: 返回響應消息的最小字節數目,必須設置。若是客戶端將此值設爲0,服務器將會當即返回,但若是沒有新的數據,服務端會返回一個空消息集。若是它被設置爲1,則服務器將在至少一個分區收到一個字節的數據的狀況下當即返回,或者等到超時時間達到。經過設置較高的值,結合超時設置,消費者能夠在犧牲一點實時性能的狀況下經過一次讀取較大的字節的數據塊從而提升的吞吐量
topic: topic的名稱
partition: topic對應partition的id
fetch_offset: 獲取數據的起始偏移量
max_bytes: 此分區返回消息集所能包含的最大字節數。

Fetch Response --> [responses]
    responses --> topic [partition_responses]
        topic --> string
        partition_responses --> partition_header messageSet
            partition_header --> partition error_code high_watermark
                parttion --> int32
                error_code --> int16
                high_watermark --> int64

high_watermark: 該partition分區中最後提交的消息的偏移量。此信息可被客戶端用來肯定後面還有多少條消息。

  • 偏移量請求響應(api_key: 2)
ListOffsets Request --> replica_id [topics]
    replica_id --> int32
    topics --> topic [partitions]
        topic --> string
        partitions --> partition timestamp max_num_offsets
            partition --> int32
            timestamp --> int64
            max_num_offsets --> int32

timestamp: 用來請求必定時間(單位:毫秒)前的全部消息
-1表示獲取最新的offset,即下一個produce的消息的offset
-2表示獲取最先的有效偏移量

ListOffsets Response --> [responses]
    responses --> topic [partition_responses]
        topic --> string
        partition_responses -->partition error_code [offsets]
            partition --> int32
            error_code --> int16
            offsets --> int64

參考

官網
A Guide To The Kafka Protocol
Kafka通信協議指南

相關文章
相關標籤/搜索