kafka二進制協議分析與PHP客戶端開發

最近分享了《應用層私有協議的設計和實戰》,對應用層私有協議設計作了一些介紹,同時也對協議設計中經常使用的數據類型作了比較形象的講解,今天咱們來研究一下kafka的二進制協議。php

數據類型

kafka二進制協議定義了許多的數據類型,包含經常使用的數字、字符串,也包含了數組等類型。java

本文主要討論不可變長數據類型,可變長度(如Google Protocol Buffers)不在討論範圍內。node

數據類型 字節長度 說明
BOOLEAN 1 布爾值
INT8 1 單字節整型,-2^7 ~ 2^7-1
INT16 2 雙字節整型,大端序,範圍 -2^15 ~ 2^15 - 1
INT32 4 四字節整型、大端序,範圍 -2^31 ~ 2^31 - 1
INT64 8 八字節整型、大端序,範圍 -2^63 ~ 2^63 -1
UINT32 4 十字街
UUID 16 16字節,Java UUID類型
STRING 2+N 頭部由2字節標識字符串長度N,後續N字節爲字符串內容
NULLABLE_STRING 2+N 頭部由2字節標識字符串長度N,後續N字節爲字符串內容,N爲-1時無後續內容
BYTES 4+N 頭部4字節標識字節數組長度,後續N字節爲字節數組內容
NULLABLE_BYTES 4+N 頭部4字節標識字節數組長度,後續N字節爲字節數組內容,N爲-1時無後續內容
ARRAY 4+N*M 頭部4字節標識數組長度N,M爲單個數組元素的長度,N爲-1時爲空數組

錯誤碼

  • -1 未知錯誤
  • 0 未出錯
  • 大於0, 具體錯誤

kafka內置的操做類型有點多,有興趣的能夠參閱kafka錯誤碼git

Api Keys

能夠理解爲操做碼,服務端根據該字段區分當前請求操做。github

這裏不作展開,有興趣的能夠參閱kafka Api Keysgolang

報文結構

接下來咱們重點分析一下kafka的報文結構。apache

本文基於kafka V1版本協議寫做,其餘版本的研究原理時一致的。

總體結構

kafka的協議結構比較簡單,請求和響應使用一樣的總體結構。segmentfault

RequestOrResponse => Size (RequestMessage | ResponseMessage)
  Size => int32

咱們轉化爲表格來看看api

image-20200117172959642

  • Size爲INT32類型,正文長度
  • Message 爲請求或響應正文的內容,變長字段,長度由Size給出

請求格式

請求數據包有固定的請求包頭,咱們來看看。數組

Request Header v1 => request_api_key request_api_version correlation_id client_id 
  request_api_key => INT16
  request_api_version => INT16
  correlation_id => INT32
  client_id => NULLABLE_STRING

上面給出的是請求頭的內容,結合總體結構得出的協議表格以下:

image-20200117173117965

  • Size 4字節正文長度(包含請求頭)
  • request_api_key 2字節 api key,用來區分操做
  • request_api_version 2字節api 版本號
  • correlation_id 4字節請求ID,服務端會原樣響應該請求ID
  • client_id 可空字符串,根據kafka數據類型定義,須要2字節client_id length字段標識client_id長度,若是client_id length爲-1,則不須要傳具體的client_id,不然須要傳遞client_id
  • request message* 請求正文,不一樣的api key請求正文不一樣

響應格式

Response Header v1 => correlation_id TAG_BUFFER 
  correlation_id => INT32

響應頭的結構比較簡單,返回了請求ID

image-20200117173658934

  • Size 4字節響應正文長度(包含請求ID)
  • correlation_id 4字節請求ID
  • response message* 響應正文

Metadata 示例

請求數據

Kafka Metadata對應的協議格式以下

Metadata Request (Version: 1) => [topics] 
  topics => name 
    name => STRING

咱們轉化爲表格看看

image-20200117173855988

  • Size 4字節請求正文長度
  • Request_api_key,根據協議文檔, 此處爲3
  • Request_api_version,本文基於v1版本寫做,所以版本號爲1
  • correlation_id 請求ID
  • client_id length 2字節客戶端長度,咱們使用test做爲客戶端標識,此處傳入4
  • client_id 客戶端名稱,傳入test字符串
  • topic name length 須要查詢的topic數組,咱們查詢test1這個topic,此處傳入1
  • topic name 字符串類型,所以先寫入字符串長度5(test1字符串長度爲5),再寫入test1字符串(總共寫入2+5 = 7個字節)

響應數據

Metadata Response (Version: 1) => [brokers] controller_id [topics] 
  brokers => node_id host port rack 
    node_id => INT32
    host => STRING
    port => INT32
    rack => NULLABLE_STRING
  controller_id => INT32
  topics => error_code name is_internal [partitions] 
    error_code => INT16
    name => STRING
    is_internal => BOOLEAN
    partitions => error_code partition_index leader_id [replica_nodes] [isr_nodes] 
      error_code => INT16
      partition_index => INT32
      leader_id => INT32
      replica_nodes => INT32
      isr_nodes => INT32

image-20200117174211271

  • Size 4字節響應長度
  • Correlation_id 4字節請求ID
  • Broker Count,數組類型,4字節整型標識數組長度

    • node_id 4字節整型,broker的節點ID
    • host 字符串類型,主機名稱
    • port 4字節整型,端口號
    • rack 可空字符串,若是broker是rack,則須要2+N字節,不然只須要2字節
  • Controller_id 4字節整型
  • Topics 數組類型,topic數組

    • error_code 2字節整型,錯誤碼
    • name 字符串類型,topic名稱
    • is_internal 布爾類型,是否內部topic
    • partions 數組類型,topic所在partition

      • error_code 2字節整型,錯誤碼
      • partition_index 4字節整型,partition index
      • leader_id 4字節整型,leader id
      • Replica_nodes 數組類型

        • Replica_node 4字節整型
      • isr_nodes 數組類型

        • Isr_node 4字節整型
其餘類型的請求也可使用一樣的方式去分析

PHP客戶端實現

PHP自帶了pack/unpack函數幫助咱們操做二進制數據,不過pack/unpack易用性比較低。

對於二進制數據,java有byte[],golang有[]byte,PHP沒有專門的類型,而是使用字符串存儲的,不過PHP字符串是二進制安全的。

針對pack/unpack函數易用性問題,這兩天參考Java的IO系統開發了一個簡單版本的io庫來簡化二進制數據流的操做(文末有倉庫地址)。

接下來使用該庫來編寫一個kafka的客戶端。

<?php
/**
 * 讀取kafka broker列表
 */
require __DIR__ . '/../vendor/autoload.php';

use io\BinaryStringInputStream;
use io\BinaryStringOutputStream;
use io\DataInputStream;
use io\DataOutputStream;
use io\FileInputStream;
use io\FileOutputStream;

$client = stream_socket_client('tcp://127.0.0.1:9092', $errno, $errstr, 5);
if ($errno) {
    die($errstr);
}


$binaryOutputStream = new BinaryStringOutputStream();
$binaryPacketOutput = new DataOutputStream($binaryOutputStream);
$binaryPacketOutput->writeUnSignedShortBE(0x03); // METADATA_REQUEST
$binaryPacketOutput->writeUnSignedShortBE(1); // API_VERSION
$binaryPacketOutput->writeUnSignedIntBE(0x01); // 請求ID
$binaryPacketOutput->writeUnSignedShortBE(strlen('test')); // 客戶端標識長度
$binaryPacketOutput->writeString('test'); // 客戶端標識
$binaryPacketOutput->writeUnSignedIntBE(1); // topic列表數組長度
// topic數組元素
$binaryPacketOutput->writeUnSignedShortBE(strlen('test1')); // 寫入2字節topic名稱長度
$binaryPacketOutput->writeString('test1'); // topic名稱
$binaryPacketOutput->flush(); // 輸出緩衝
$packet = $binaryOutputStream->toBinaryString(); // 得到構造好的正文數據包

// 包裝socket連接,得到多數據類型操做能力
$out = new DataOutputStream(new FileOutputStream($client));
$out->writeUnSignedIntBE(strlen($packet)); // 4字節包長度
$out->write($packet); // 包體
$out->flush(); // 輸出到Socket

// 實例化輸入流,從socket讀取數據
$in = new DataInputStream(new FileInputStream($client));
$size = $in->readUnSignedIntBE(); // 4字節包長度
// 一次性讀取完socket數據後關閉,而後將讀取到的響應數據填充到二進制字符串輸入流中,釋放socket
$in = new DataInputStream(new BinaryStringInputStream(fread($client, $size)));
fclose($client);

$requestId = $in->readUnSignedIntBE(); // 4字節請求ID
printf("packet length: %d requestId: %d\n", $size, $requestId);

$brokerCount = $in->readUnSignedIntBE(); // broker數量
for ($i = 0; $i < $brokerCount; $i++) { // 循環讀取broker
    $nodeId = $in->readUnSignedIntBE(); // nodeId
    $hostLength = $in->readUnSignedShortBE(); // host長度
    $host = $in->readString($hostLength); // 主機名
    $port = $in->readUnSignedIntBE(); // port
    $rackLength = $in->readShortBE(); // rack
    $rack = null;
    if ($rackLength != -1) {
        $rack = $in->readString($rackLength);
    }
    printf("nodeId:%d host:%s port:%d rack: %s\n", $nodeId, $host, $port, $rack);
}
$controllerId = $in->readUnSignedIntBE();
printf("controllerId: %d\n", $controllerId);
$topicCount = $in->readUnSignedIntBE();
printf("topic count %d\n", $topicCount);


for ($i = 0; $i < $topicCount; $i++) {
    printf("----topic list----\n");
    $errCode = $in->readUnSignedShortBE();
    $nameLength = $in->readUnSignedShortBE();
    $name = $in->readString($nameLength);
    $isInternal = $in->readUnSignedChar();
    printf("errcode: %d name: %s interval: %d\n", $errCode, $name, $isInternal);

    $partitionCount = $in->readUnSignedIntBE();
    printf("----topic [%s] partition list count %d---\n", $name, $partitionCount);
    for ($j = 0; $j < $partitionCount; $j++) {
        $errCode = $in->readUnSignedShortBE();
        $partitionIndex = $in->readUnSignedIntBE();
        $leaderId = $in->readUnSignedIntBE();
        $replicaNodesCount = $in->readUnSignedIntBE();
        $replicaNodes = [];
        for ($k = 0; $k < $replicaNodesCount; $k++) {
            $replicaNodes[] = $in->readUnSignedIntBE();
        }
        $isrNodeCount = $in->readUnSignedIntBE();
        $isrNodes = [];
        for ($k = 0; $k < $isrNodeCount; $k++) {
            $isrNodes[] = $in->readUnSignedIntBE();
        }
        printf(
            "errcode: %d partitionIndex: %d leaderId: %d replicaNodes: [%s] isrNodes: [%s]\n",
            $errCode,
            $partitionIndex,
            $leaderId,
            join(',', $replicaNodes),
            join(',', $isrNodes)
        );
    }
}

輸出以下:

packet length: 73 requestId: 1
nodeId:0 host:bogon port:9092 rack: 
controllerId: 0
topic count 1
----topic list----
errcode: 0 name: test1 interval: 0
----topic [test1] partition list count 1---
errcode: 0 partitionIndex: 0 leaderId: 0 replicaNodes: [0] isrNodes: [0]

項目地址

php-io

(完)
0.jpeg

相關文章
相關標籤/搜索