使用netty徒手擼一個簡單的kafkaClient

使用netty徒手擼一個ZkClientjava

使用netty徒手擼一個RedisClientgit

前兩天博文咱們介紹瞭如何使用netty徒手擼一個kafka的客戶端. 所謂的kafka客戶端就是kafka的producer和consumer了.github

吐槽一下kafka的api設計

你們都知道, kafka的客戶端是重構過一版的. 以前0.8的producer和consumer是使用scala開發的,後來由於各類緣由, 實在是改不動了. 到了0.9版本的時候,使用java重構了kafka的客戶端.bootstrap

雖然如今java版的客戶端還在普遍使用,並且沒有什麼太大的性能問題. 可是根據我這些天對kafka客戶端的api的研究, 我總以爲, 總有一天, kafka的客戶端還得來一次完全的重構.由於什麼呢? 由於實在是太--亂--啦:api

1. 多版本問題.
   每一個api都有好幾個版本, 可是每一個api使用的版本都不一致. 
   舉個例子, 在kafka-client 1.0.0中,broker的版本是2.3.0時:
   METADATA(拉取topic元數據)的api有1個version, 當前使用版本是1.
   PRODUCE(生產消息)的api有6個version, 當前使用版本是6
   FETCH(拉取消息)的api有5個version, 當前使用版本是5

2. 報文的數據結構巨複雜
   等下實現生產消息的報文的時候,大家會看到,這個報文嵌套了6層,即有6個子結構體.
複製代碼

以上我對kafka api的小小吐槽. 固然也多是我水平不夠,未能理解到它這麼設計的用意和深意~~bash

kafka的消息格式

kafka一直在不斷地優化自身, 所以它的消息格式也是一直在變.數據結構

在<<Apache kafka實戰 (胡夕著)>> 一書中(基於kafka 1.0.0), 做者介紹了到目前爲止, 一共有3種消息格式V0,V1,V2. 其中V0和V1因爲各類弊端, 早就逐漸的被淘汰了. 如今新版kafka使用的都是V2版本的消息格式. 本文就是在kafka2.3.0上實現的, 使用V2格式的消息能測試經過.併發

所以這裏介紹的消息格式都是V2版本的.post

在開始介紹kafka的消息格式以前, 你們還要理解一個概念: 可變長度.
常規的長度字段要麼就是使用4個字節,要麼就是使用8個字節來表示,
總之這個字段使用的字節數通常都是固定的.
可是在kafka的v2版本的消息裏就不同了.
它參考了Zig-zag的編碼方式, 可使用不一樣長度的字段來表示不一樣的數值.

簡單來講就是這樣:

用 0 來表示 0
用 1 來表示 1
用 2 來表示 -1
用 3 來表示 2
用 4 來表示 -2
.....

這樣的好處就是能夠用比較少的字節數來表示絕對值比較小的數字, 
不用每一個數字都佔用4個或8個字節, 從而能夠節省很大的空間

複製代碼

瞭解了"可變長度"這個概念後就能夠來看kafka的v2版本的消息格式了. 以下圖(截圖自<<Apache kafka實戰 (胡夕著)>> 一書):性能

image
咱們來一個一個瞭解這些字段

1. 消息總長度. 顧名思義, 就是這條消息的總長度啦. 用的是Zig-zag編碼表示
  2. 屬性. 一個字節表示(8位), 其中第三位用來表示壓縮方式.高5位保留,沒有用到
  因爲我這裏的實現沒有用到壓縮,因此這個字段老是0
  3. 時間戳增量.也是用Zig-zag編碼. 所謂增量, 是指針對該消息batch的第一條消息的時間戳的增量.
  消息batch接下來會介紹.
  4. 位移增量. 跟時間戳增量含義差很少
  5. key length. 每條kafka消息均可以有key, 這個就表示key的字節數
  6. key. 這個字段就是kafka消息裏面的key.
  7. value size. 更key length含義差很少
  8  value. 就是kafka消息的內容
  9. header個數. kafka消息也能夠帶有header
  10. header. kafka的header
複製代碼

看到第3和第4個字段是否是有點一臉懵?不要緊, 繼續往下看你就明白了.

kafka發送消息的時候並非有一條發送一條的, 而是把多條消息集中在一塊兒, 而後再一併發送的. 這就是所謂的kafka 消息batch.

並且這消息batch發送到kafka的broker以後, 它也一樣不會拆開, 而是原封不動地把這個消息batch發給消費者,或存儲到日誌文件中.

所以理解這個消息batch對咱們實現發送消息和消費消息都是必要的.

消息batch的格式以下圖所示:

image

是否是一會兒有點奔潰, 一會兒冒出了這麼多的字段. 沒得辦法, 咱們再來一個個地看.

首先最後的"消息"就是上面介紹的v2版本格式的消息,可能會有x個, x就是倒數第二個字段"消息個數".

剩下的字段:

1. 起始位移
   最後面的"消息"中第一條消息的位移offset
2. 長度
   表示接下來的報文的長度, 即"消息batch的總長度" - 8Byte(起始位移字段) - 4Byte(長度字段)
3. 分區leader版本號
   我這裏的實現寫死爲-1
4. 版本號
   就是magic. 咱們這裏是V2,因此是2
5. CRC
   是指接下來的全部字段的CRC碼
6. 屬性
   跟上面消息中的屬性的含義一致
7. 最大位移增量
   就是最後一條消息的"位置增量"的值
8. 起始時間戳
   就是第一條消息的時間戳
9. 最大時間戳
   最後一條消息的時間戳
10. 後面三個pid epoch, seq三個字段都是跟事務等相關的,咱們這裏沒有用到, 因此都寫死爲-1   
複製代碼

這裏的"消息"和"消息batch"我在代碼中定義的bean分別是KafkaMsgRecordV2和KafkaMsgRecordBatch. 若是看上面的文字和圖片確實很差理解的話, 能夠跟着代碼看, 或者能夠理解得更加深入. 代碼請見文末的github地址.

固然若是你理解了這一段, 那很好.不過也別開心太早.由於上面說了, kafak發送消息的數據結構嵌套了6層, 而這裏才兩層. 也就是還有4層等着咱們去理解. 固然, 那4層相對是比較簡單的. 最難理解的部分已通過去了

requestHeader和responseHeader

kafka每一個api的請求都必須帶有一個請求的header, 而每一個api的響應體中也都帶有一個響應的header.requestHeader和responseHeader分別如圖所示:

image

image

響應的header比較簡單, 就是一個correlationId,這個id實際上是客戶端發送給服務端, 服務端原封不動的返回了. 做用跟zookeeper的xid同樣.

咱們來看看requestHeader

apikey 和 apiVersion

public enum ApiKeys {
    /**
     * 發送消息
     */
    PRODUCE(0, "Produce", (short) 5),

    /**
     * fetch 消息
     */
    FETCH(1, "Fetch", (short)6),
    /**
     * 拉取元數據
     */
    METADATA(3, "Metadata", (short) 1);

    public final short id;

    public final String name;

    public short apiVersion;

    ApiKeys(int id, String name, short apiVersion) {
        this.id = (short) id;
        this.name = name;
        this.apiVersion = apiVersion;
    }


}
複製代碼

代碼中的id字段就是apiKey, apiVersion對應的就是header中的apiVersion. 正如咱們開頭吐槽的同樣, 每一個api的版本都是不同的. 在此次實現裏, 我只實現了3個api. 但實際上kafka提供十幾個api.

correlationId

關聯性Id和zkClient中的xid做用是同樣的, 主要是把請求和響應對應起來. kafka的響應報文中會包含這個字段.

clientIdLen和clientId

不論是kafka生產者仍是消費者, 都須要指定一個clientId. 在官方的客戶端中,若是咱們不指定的話, 也會自動生成一個clientId.

最後值得一提的是, 這裏的clientIdLen是用兩個字節表示的. kafka裏面都是用2個字節表示字符串長度的. 這個跟zookeeper裏面是不同的.

生產者

生產者的邏輯實如今KafkaClient的send方法:

public ProduceResponse send(KafkaProduceConfig config, String topic , String key, String val)
複製代碼

正如上面一直提到的, 生產者的請求報文一共嵌套了6層, 具體表現爲:

1. ProduceRequest繼承KafkaRequestHeader, 持有TopicProduceData對象
  2. TopicProduceData 持有PartitionData對象
  3. PartitionData持有Record對象
  4. Record持有KafkaMsgRecordBatch對象
  5. KafkaMsgRecordBatch持有KafkaMsgRecordV2對象
複製代碼

能夠看到, 實際上是以"broker信息" => "topic信息" =>"分區信息" => "記錄信息" => "消息batch" => "消息"等層次逐漸包裝的.

報文的的字段和圖示這裏就再也不給出了,有興趣的同窗能夠跟一下代碼, 直接從序列化入手, 就能夠理解kafka生產者的通信協議了, 大致邏輯以下所示:

- ProduceRequest.serializable()
- KafkaRequestHeader.serializable()
    - TopicProduceData.serializable()
        - PartitionData.serializable()
            - Record.serializable()
               - KafkaMsgRecordBatch.serializable()
                   - KafkaMsgRecordV2.serializable()
複製代碼

通過上面的一系列serializable, 最終把一個ProduceRequest對象轉換成一個ByteBuf,發往kafka的broker, 一條消息就成功的產生了.

消費者

生產者的邏輯實如今KafkaClient的poll方法:

public Map<Integer, List<ConsumerRecord>> poll(KafkaConsumerConfig consumerConfig, String topic, int partition, long fetchOffset)
複製代碼

相對於生產者來講, 消費者的請求報文相對簡單,也是一個從"broker配置"=>"topic信息" => "分區信息"的包裝過程

以下所示:

1. FetchRequest 繼承KafkaRequestHeader, 持有FetchTopicRequest對象
2. FetchTopicRequest持有FetchTopicPartitionRequest對象

複製代碼

然而, 消費者的響應體就相對比生產者的響應體要複雜的多了.

由於上面說過, 生產者發送broker的"消息batch", broker是不會把它解析成具體的消息的. 並且原封不動地把它保存到日誌中去, 從而也是原封不動的被消費者消費到. 所以這個解析消息的工做天然而然地就落到了消費者的肩上.

具體請參見KafkaClient#parseResp()方法

代碼運行

和以前的ZkClient和RedisClient同樣, 這裏也一樣實現了一個kafkaClientTest,方便體驗和調試.

此次針對了幾種場景進行測試:

  1. 在kafkaClientTest中生產消息, 利用kafka自帶的kafka-console-consumer.sh 進行消費

生產消息:

private final static String host = "localhost";
    private final static int port = 9092;
    private final static String topic = "testTopic1";
 @Test
    public void testProducer(){
        KafkaClient kafkaClient = new KafkaClient("producer-111", host, port);
        KafkaProduceConfig kafkaConfig = new KafkaProduceConfig();
        // 注意這裏設置爲0時, broker不會響應任何數據, 可是消息其實是發送到broker了的
        short ack = -1;
        kafkaConfig.setAck(ack);
        kafkaConfig.setTimeout(30000);
        ProduceResponse response  = kafkaClient.send(kafkaConfig, topic,"testKey","helloWorld1113");
        assert ack == 0 || response != null;
        System.out.println(new Gson().toJson(response));
    }
複製代碼

能夠在控制檯看到消息被消費了:

lhhMacBook-Air:bin$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic1
helloWorld1113
複製代碼
  1. 在kafkaClientTest中生產消息(場景1的消息), 在kafkaClientTest消費消息:
private final static String host = "localhost";
    private final static int port = 9092;
    private final static String topic = "testTopic1";
   @Test
    public void testConsumer(){
        // 若是broker上不存在這個topic的話, 直接消費可能會報錯, 能夠fetch一下metadata, 或先生產消息
        // testMetaData();
        // testProducer();
        KafkaClient kafkaClient = new KafkaClient("consumer-111", host, port);
        KafkaConsumerConfig consumerConfig = new KafkaConsumerConfig();
        consumerConfig.setMaxBytes(Integer.MAX_VALUE);
        consumerConfig.setMaxWaitTime(30000);
        consumerConfig.setMinBytes(1);
        Map<Integer, List<ConsumerRecord>>  response = kafkaClient.poll(consumerConfig, topic, 0, 0L);
        assert response != null && response.size() > 0;
        Set<Map.Entry<Integer, List<ConsumerRecord>>> entrySet =response.entrySet();
        for(Map.Entry<Integer, List<ConsumerRecord>> entry : entrySet){
            Integer partition = entry.getKey();
            System.out.println("partition" + partition + "的數據:");
            for(ConsumerRecord consumerRecord : entry.getValue()){
                System.out.println(new Gson().toJson(consumerRecord));
            }
        }

    }
複製代碼

控制檯打印出剛剛生產的消息(包含了以前測試的消息), 說明消費成功:

partition0的數據:
{"offset":0,"timeStamp":1573896186007,"key":"testKey","val":"helloWorld"}
{"offset":1,"timeStamp":1573896202787,"key":"testKey","val":"helloWorld"}
{"offset":2,"timeStamp":1573896309808,"key":"testKey","val":"helloWorld111"}
{"offset":3,"timeStamp":1573899639313,"key":"testKey","val":"helloWorld1113"}
{"offset":4,"timeStamp":1574011584095,"key":"testKey","val":"helloWorld1113"}
複製代碼
  1. 利用kafka-console-producer.sh生產消息, 在kafkaClientTest消費消息:

生產消息:

lhhMacBook-Air:bin$ sh kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic222
>hi
>h
複製代碼

消費消息輸出, 說明消費成功

partition0的數據:
{"offset":0,"timeStamp":1574012251856,"val":"hi"}
{"offset":1,"timeStamp":1574012270368,"val":"h"}
複製代碼

源碼

最後附上github源碼地址:

github.com/NorthWard/a…

感興趣的同窗能夠參考一下,共同窗習進步.

相關文章
相關標籤/搜索