最近分享了《應用層私有協議的設計和實戰》,對應用層私有協議設計作了一些介紹,同時也對協議設計中經常使用的數據類型作了比較形象的講解,今天咱們來研究一下kafka的二進制協議。php
kafka二進制協議定義了許多的數據類型,包含經常使用的數字、字符串,也包含了數組等類型。java
本文主要討論不可變長數據類型,可變長度(如Google Protocol Buffers)不在討論範圍內。node
數據類型 | 字節長度 | 說明 |
---|---|---|
BOOLEAN | 1 | 布爾值 |
INT8 | 1 | 單字節整型,-2^7 ~ 2^7-1 |
INT16 | 2 | 雙字節整型,大端序,範圍 -2^15 ~ 2^15 - 1 |
INT32 | 4 | 四字節整型、大端序,範圍 -2^31 ~ 2^31 - 1 |
INT64 | 8 | 八字節整型、大端序,範圍 -2^63 ~ 2^63 -1 |
UINT32 | 4 | 十字街 |
UUID | 16 | 16字節,Java UUID類型 |
STRING | 2+N | 頭部由2字節標識字符串長度N,後續N字節爲字符串內容 |
NULLABLE_STRING | 2+N | 頭部由2字節標識字符串長度N,後續N字節爲字符串內容,N爲-1時無後續內容 |
BYTES | 4+N | 頭部4字節標識字節數組長度,後續N字節爲字節數組內容 |
NULLABLE_BYTES | 4+N | 頭部4字節標識字節數組長度,後續N字節爲字節數組內容,N爲-1時無後續內容 |
ARRAY | 4+N*M | 頭部4字節標識數組長度N,M爲單個數組元素的長度,N爲-1時爲空數組 |
kafka內置的操做類型有點多,有興趣的能夠參閱kafka錯誤碼git
能夠理解爲操做碼,服務端根據該字段區分當前請求操做。github
這裏不作展開,有興趣的能夠參閱kafka Api Keysgolang
接下來咱們重點分析一下kafka的報文結構。apache
本文基於kafka V1版本協議寫做,其餘版本的研究原理時一致的。
kafka的協議結構比較簡單,請求和響應使用一樣的總體結構。segmentfault
RequestOrResponse => Size (RequestMessage | ResponseMessage) Size => int32
咱們轉化爲表格來看看api
請求數據包有固定的請求包頭,咱們來看看。數組
Request Header v1 => request_api_key request_api_version correlation_id client_id request_api_key => INT16 request_api_version => INT16 correlation_id => INT32 client_id => NULLABLE_STRING
上面給出的是請求頭的內容,結合總體結構得出的協議表格以下:
Response Header v1 => correlation_id TAG_BUFFER correlation_id => INT32
響應頭的結構比較簡單,返回了請求ID
Kafka Metadata對應的協議格式以下
Metadata Request (Version: 1) => [topics] topics => name name => STRING
咱們轉化爲表格看看
Metadata Response (Version: 1) => [brokers] controller_id [topics] brokers => node_id host port rack node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING controller_id => INT32 topics => error_code name is_internal [partitions] error_code => INT16 name => STRING is_internal => BOOLEAN partitions => error_code partition_index leader_id [replica_nodes] [isr_nodes] error_code => INT16 partition_index => INT32 leader_id => INT32 replica_nodes => INT32 isr_nodes => INT32
Broker Count,數組類型,4字節整型標識數組長度
Topics 數組類型,topic數組
partions 數組類型,topic所在partition
Replica_nodes 數組類型
isr_nodes 數組類型
其餘類型的請求也可使用一樣的方式去分析
PHP自帶了pack/unpack函數幫助咱們操做二進制數據,不過pack/unpack易用性比較低。
對於二進制數據,java有byte[],golang有[]byte,PHP沒有專門的類型,而是使用字符串存儲的,不過PHP字符串是二進制安全的。
針對pack/unpack函數易用性問題,這兩天參考Java的IO系統開發了一個簡單版本的io庫來簡化二進制數據流的操做(文末有倉庫地址)。
接下來使用該庫來編寫一個kafka的客戶端。
<?php /** * 讀取kafka broker列表 */ require __DIR__ . '/../vendor/autoload.php'; use io\BinaryStringInputStream; use io\BinaryStringOutputStream; use io\DataInputStream; use io\DataOutputStream; use io\FileInputStream; use io\FileOutputStream; $client = stream_socket_client('tcp://127.0.0.1:9092', $errno, $errstr, 5); if ($errno) { die($errstr); } $binaryOutputStream = new BinaryStringOutputStream(); $binaryPacketOutput = new DataOutputStream($binaryOutputStream); $binaryPacketOutput->writeUnSignedShortBE(0x03); // METADATA_REQUEST $binaryPacketOutput->writeUnSignedShortBE(1); // API_VERSION $binaryPacketOutput->writeUnSignedIntBE(0x01); // 請求ID $binaryPacketOutput->writeUnSignedShortBE(strlen('test')); // 客戶端標識長度 $binaryPacketOutput->writeString('test'); // 客戶端標識 $binaryPacketOutput->writeUnSignedIntBE(1); // topic列表數組長度 // topic數組元素 $binaryPacketOutput->writeUnSignedShortBE(strlen('test1')); // 寫入2字節topic名稱長度 $binaryPacketOutput->writeString('test1'); // topic名稱 $binaryPacketOutput->flush(); // 輸出緩衝 $packet = $binaryOutputStream->toBinaryString(); // 得到構造好的正文數據包 // 包裝socket連接,得到多數據類型操做能力 $out = new DataOutputStream(new FileOutputStream($client)); $out->writeUnSignedIntBE(strlen($packet)); // 4字節包長度 $out->write($packet); // 包體 $out->flush(); // 輸出到Socket // 實例化輸入流,從socket讀取數據 $in = new DataInputStream(new FileInputStream($client)); $size = $in->readUnSignedIntBE(); // 4字節包長度 // 一次性讀取完socket數據後關閉,而後將讀取到的響應數據填充到二進制字符串輸入流中,釋放socket $in = new DataInputStream(new BinaryStringInputStream(fread($client, $size))); fclose($client); $requestId = $in->readUnSignedIntBE(); // 4字節請求ID printf("packet length: %d requestId: %d\n", $size, $requestId); $brokerCount = $in->readUnSignedIntBE(); // broker數量 for ($i = 0; $i < $brokerCount; $i++) { // 循環讀取broker $nodeId = $in->readUnSignedIntBE(); // nodeId $hostLength = $in->readUnSignedShortBE(); // host長度 $host = $in->readString($hostLength); // 主機名 $port = $in->readUnSignedIntBE(); // port $rackLength = $in->readShortBE(); // rack $rack = null; if ($rackLength != -1) { $rack = $in->readString($rackLength); } printf("nodeId:%d host:%s port:%d rack: %s\n", $nodeId, $host, $port, $rack); } $controllerId = $in->readUnSignedIntBE(); printf("controllerId: %d\n", $controllerId); $topicCount = $in->readUnSignedIntBE(); printf("topic count %d\n", $topicCount); for ($i = 0; $i < $topicCount; $i++) { printf("----topic list----\n"); $errCode = $in->readUnSignedShortBE(); $nameLength = $in->readUnSignedShortBE(); $name = $in->readString($nameLength); $isInternal = $in->readUnSignedChar(); printf("errcode: %d name: %s interval: %d\n", $errCode, $name, $isInternal); $partitionCount = $in->readUnSignedIntBE(); printf("----topic [%s] partition list count %d---\n", $name, $partitionCount); for ($j = 0; $j < $partitionCount; $j++) { $errCode = $in->readUnSignedShortBE(); $partitionIndex = $in->readUnSignedIntBE(); $leaderId = $in->readUnSignedIntBE(); $replicaNodesCount = $in->readUnSignedIntBE(); $replicaNodes = []; for ($k = 0; $k < $replicaNodesCount; $k++) { $replicaNodes[] = $in->readUnSignedIntBE(); } $isrNodeCount = $in->readUnSignedIntBE(); $isrNodes = []; for ($k = 0; $k < $isrNodeCount; $k++) { $isrNodes[] = $in->readUnSignedIntBE(); } printf( "errcode: %d partitionIndex: %d leaderId: %d replicaNodes: [%s] isrNodes: [%s]\n", $errCode, $partitionIndex, $leaderId, join(',', $replicaNodes), join(',', $isrNodes) ); } }
輸出以下:
packet length: 73 requestId: 1 nodeId:0 host:bogon port:9092 rack: controllerId: 0 topic count 1 ----topic list---- errcode: 0 name: test1 interval: 0 ----topic [test1] partition list count 1--- errcode: 0 partitionIndex: 0 leaderId: 0 replicaNodes: [0] isrNodes: [0]
(完)