前兩天博文咱們介紹瞭如何使用netty徒手擼一個kafka的客戶端. 所謂的kafka客戶端就是kafka的producer和consumer了.github
你們都知道, kafka的客戶端是重構過一版的. 以前0.8的producer和consumer是使用scala開發的,後來由於各類緣由, 實在是改不動了. 到了0.9版本的時候,使用java重構了kafka的客戶端.bootstrap
雖然如今java版的客戶端還在普遍使用,並且沒有什麼太大的性能問題. 可是根據我這些天對kafka客戶端的api的研究, 我總以爲, 總有一天, kafka的客戶端還得來一次完全的重構.由於什麼呢? 由於實在是太--亂--啦:api
1. 多版本問題.
每一個api都有好幾個版本, 可是每一個api使用的版本都不一致.
舉個例子, 在kafka-client 1.0.0中,broker的版本是2.3.0時:
METADATA(拉取topic元數據)的api有1個version, 當前使用版本是1.
PRODUCE(生產消息)的api有6個version, 當前使用版本是6
FETCH(拉取消息)的api有5個version, 當前使用版本是5
2. 報文的數據結構巨複雜
等下實現生產消息的報文的時候,大家會看到,這個報文嵌套了6層,即有6個子結構體.
複製代碼
以上我對kafka api的小小吐槽. 固然也多是我水平不夠,未能理解到它這麼設計的用意和深意~~bash
kafka一直在不斷地優化自身, 所以它的消息格式也是一直在變.數據結構
在<<Apache kafka實戰 (胡夕著)>> 一書中(基於kafka 1.0.0), 做者介紹了到目前爲止, 一共有3種消息格式V0,V1,V2. 其中V0和V1因爲各類弊端, 早就逐漸的被淘汰了. 如今新版kafka使用的都是V2版本的消息格式. 本文就是在kafka2.3.0上實現的, 使用V2格式的消息能測試經過.併發
所以這裏介紹的消息格式都是V2版本的.post
在開始介紹kafka的消息格式以前, 你們還要理解一個概念: 可變長度.
常規的長度字段要麼就是使用4個字節,要麼就是使用8個字節來表示,
總之這個字段使用的字節數通常都是固定的.
可是在kafka的v2版本的消息裏就不同了.
它參考了Zig-zag的編碼方式, 可使用不一樣長度的字段來表示不一樣的數值.
簡單來講就是這樣:
用 0 來表示 0
用 1 來表示 1
用 2 來表示 -1
用 3 來表示 2
用 4 來表示 -2
.....
這樣的好處就是能夠用比較少的字節數來表示絕對值比較小的數字,
不用每一個數字都佔用4個或8個字節, 從而能夠節省很大的空間
複製代碼
瞭解了"可變長度"這個概念後就能夠來看kafka的v2版本的消息格式了. 以下圖(截圖自<<Apache kafka實戰 (胡夕著)>> 一書):性能
1. 消息總長度. 顧名思義, 就是這條消息的總長度啦. 用的是Zig-zag編碼表示
2. 屬性. 一個字節表示(8位), 其中第三位用來表示壓縮方式.高5位保留,沒有用到
因爲我這裏的實現沒有用到壓縮,因此這個字段老是0
3. 時間戳增量.也是用Zig-zag編碼. 所謂增量, 是指針對該消息batch的第一條消息的時間戳的增量.
消息batch接下來會介紹.
4. 位移增量. 跟時間戳增量含義差很少
5. key length. 每條kafka消息均可以有key, 這個就表示key的字節數
6. key. 這個字段就是kafka消息裏面的key.
7. value size. 更key length含義差很少
8 value. 就是kafka消息的內容
9. header個數. kafka消息也能夠帶有header
10. header. kafka的header
複製代碼
看到第3和第4個字段是否是有點一臉懵?不要緊, 繼續往下看你就明白了.
kafka發送消息的時候並非有一條發送一條的, 而是把多條消息集中在一塊兒, 而後再一併發送的. 這就是所謂的kafka 消息batch.
並且這消息batch發送到kafka的broker以後, 它也一樣不會拆開, 而是原封不動地把這個消息batch發給消費者,或存儲到日誌文件中.
所以理解這個消息batch對咱們實現發送消息和消費消息都是必要的.
消息batch的格式以下圖所示:
是否是一會兒有點奔潰, 一會兒冒出了這麼多的字段. 沒得辦法, 咱們再來一個個地看.
首先最後的"消息"就是上面介紹的v2版本格式的消息,可能會有x個, x就是倒數第二個字段"消息個數".
剩下的字段:
1. 起始位移
最後面的"消息"中第一條消息的位移offset
2. 長度
表示接下來的報文的長度, 即"消息batch的總長度" - 8Byte(起始位移字段) - 4Byte(長度字段)
3. 分區leader版本號
我這裏的實現寫死爲-1
4. 版本號
就是magic. 咱們這裏是V2,因此是2
5. CRC
是指接下來的全部字段的CRC碼
6. 屬性
跟上面消息中的屬性的含義一致
7. 最大位移增量
就是最後一條消息的"位置增量"的值
8. 起始時間戳
就是第一條消息的時間戳
9. 最大時間戳
最後一條消息的時間戳
10. 後面三個pid epoch, seq三個字段都是跟事務等相關的,咱們這裏沒有用到, 因此都寫死爲-1
複製代碼
這裏的"消息"和"消息batch"我在代碼中定義的bean分別是KafkaMsgRecordV2和KafkaMsgRecordBatch. 若是看上面的文字和圖片確實很差理解的話, 能夠跟着代碼看, 或者能夠理解得更加深入. 代碼請見文末的github地址.
固然若是你理解了這一段, 那很好.不過也別開心太早.由於上面說了, kafak發送消息的數據結構嵌套了6層, 而這裏才兩層. 也就是還有4層等着咱們去理解. 固然, 那4層相對是比較簡單的. 最難理解的部分已通過去了
kafka每一個api的請求都必須帶有一個請求的header, 而每一個api的響應體中也都帶有一個響應的header.requestHeader和responseHeader分別如圖所示:
響應的header比較簡單, 就是一個correlationId,這個id實際上是客戶端發送給服務端, 服務端原封不動的返回了. 做用跟zookeeper的xid同樣.
咱們來看看requestHeader
public enum ApiKeys {
/**
* 發送消息
*/
PRODUCE(0, "Produce", (short) 5),
/**
* fetch 消息
*/
FETCH(1, "Fetch", (short)6),
/**
* 拉取元數據
*/
METADATA(3, "Metadata", (short) 1);
public final short id;
public final String name;
public short apiVersion;
ApiKeys(int id, String name, short apiVersion) {
this.id = (short) id;
this.name = name;
this.apiVersion = apiVersion;
}
}
複製代碼
代碼中的id字段就是apiKey, apiVersion對應的就是header中的apiVersion. 正如咱們開頭吐槽的同樣, 每一個api的版本都是不同的. 在此次實現裏, 我只實現了3個api. 但實際上kafka提供十幾個api.
關聯性Id和zkClient中的xid做用是同樣的, 主要是把請求和響應對應起來. kafka的響應報文中會包含這個字段.
不論是kafka生產者仍是消費者, 都須要指定一個clientId. 在官方的客戶端中,若是咱們不指定的話, 也會自動生成一個clientId.
最後值得一提的是, 這裏的clientIdLen是用兩個字節表示的. kafka裏面都是用2個字節表示字符串長度的. 這個跟zookeeper裏面是不同的.
生產者的邏輯實如今KafkaClient的send方法:
public ProduceResponse send(KafkaProduceConfig config, String topic , String key, String val)
複製代碼
正如上面一直提到的, 生產者的請求報文一共嵌套了6層, 具體表現爲:
1. ProduceRequest繼承KafkaRequestHeader, 持有TopicProduceData對象
2. TopicProduceData 持有PartitionData對象
3. PartitionData持有Record對象
4. Record持有KafkaMsgRecordBatch對象
5. KafkaMsgRecordBatch持有KafkaMsgRecordV2對象
複製代碼
能夠看到, 實際上是以"broker信息" => "topic信息" =>"分區信息" => "記錄信息" => "消息batch" => "消息"等層次逐漸包裝的.
報文的的字段和圖示這裏就再也不給出了,有興趣的同窗能夠跟一下代碼, 直接從序列化入手, 就能夠理解kafka生產者的通信協議了, 大致邏輯以下所示:
- ProduceRequest.serializable()
- KafkaRequestHeader.serializable()
- TopicProduceData.serializable()
- PartitionData.serializable()
- Record.serializable()
- KafkaMsgRecordBatch.serializable()
- KafkaMsgRecordV2.serializable()
複製代碼
通過上面的一系列serializable, 最終把一個ProduceRequest對象轉換成一個ByteBuf,發往kafka的broker, 一條消息就成功的產生了.
生產者的邏輯實如今KafkaClient的poll方法:
public Map<Integer, List<ConsumerRecord>> poll(KafkaConsumerConfig consumerConfig, String topic, int partition, long fetchOffset)
複製代碼
相對於生產者來講, 消費者的請求報文相對簡單,也是一個從"broker配置"=>"topic信息" => "分區信息"的包裝過程
以下所示:
1. FetchRequest 繼承KafkaRequestHeader, 持有FetchTopicRequest對象
2. FetchTopicRequest持有FetchTopicPartitionRequest對象
複製代碼
然而, 消費者的響應體就相對比生產者的響應體要複雜的多了.
由於上面說過, 生產者發送broker的"消息batch", broker是不會把它解析成具體的消息的. 並且原封不動地把它保存到日誌中去, 從而也是原封不動的被消費者消費到. 所以這個解析消息的工做天然而然地就落到了消費者的肩上.
具體請參見KafkaClient#parseResp()方法
和以前的ZkClient和RedisClient同樣, 這裏也一樣實現了一個kafkaClientTest,方便體驗和調試.
此次針對了幾種場景進行測試:
生產消息:
private final static String host = "localhost";
private final static int port = 9092;
private final static String topic = "testTopic1";
@Test
public void testProducer(){
KafkaClient kafkaClient = new KafkaClient("producer-111", host, port);
KafkaProduceConfig kafkaConfig = new KafkaProduceConfig();
// 注意這裏設置爲0時, broker不會響應任何數據, 可是消息其實是發送到broker了的
short ack = -1;
kafkaConfig.setAck(ack);
kafkaConfig.setTimeout(30000);
ProduceResponse response = kafkaClient.send(kafkaConfig, topic,"testKey","helloWorld1113");
assert ack == 0 || response != null;
System.out.println(new Gson().toJson(response));
}
複製代碼
能夠在控制檯看到消息被消費了:
lhhMacBook-Air:bin$ sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic1
helloWorld1113
複製代碼
private final static String host = "localhost";
private final static int port = 9092;
private final static String topic = "testTopic1";
@Test
public void testConsumer(){
// 若是broker上不存在這個topic的話, 直接消費可能會報錯, 能夠fetch一下metadata, 或先生產消息
// testMetaData();
// testProducer();
KafkaClient kafkaClient = new KafkaClient("consumer-111", host, port);
KafkaConsumerConfig consumerConfig = new KafkaConsumerConfig();
consumerConfig.setMaxBytes(Integer.MAX_VALUE);
consumerConfig.setMaxWaitTime(30000);
consumerConfig.setMinBytes(1);
Map<Integer, List<ConsumerRecord>> response = kafkaClient.poll(consumerConfig, topic, 0, 0L);
assert response != null && response.size() > 0;
Set<Map.Entry<Integer, List<ConsumerRecord>>> entrySet =response.entrySet();
for(Map.Entry<Integer, List<ConsumerRecord>> entry : entrySet){
Integer partition = entry.getKey();
System.out.println("partition" + partition + "的數據:");
for(ConsumerRecord consumerRecord : entry.getValue()){
System.out.println(new Gson().toJson(consumerRecord));
}
}
}
複製代碼
控制檯打印出剛剛生產的消息(包含了以前測試的消息), 說明消費成功:
partition0的數據:
{"offset":0,"timeStamp":1573896186007,"key":"testKey","val":"helloWorld"}
{"offset":1,"timeStamp":1573896202787,"key":"testKey","val":"helloWorld"}
{"offset":2,"timeStamp":1573896309808,"key":"testKey","val":"helloWorld111"}
{"offset":3,"timeStamp":1573899639313,"key":"testKey","val":"helloWorld1113"}
{"offset":4,"timeStamp":1574011584095,"key":"testKey","val":"helloWorld1113"}
複製代碼
生產消息:
lhhMacBook-Air:bin$ sh kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic222
>hi
>h
複製代碼
消費消息輸出, 說明消費成功
partition0的數據:
{"offset":0,"timeStamp":1574012251856,"val":"hi"}
{"offset":1,"timeStamp":1574012270368,"val":"h"}
複製代碼
最後附上github源碼地址:
感興趣的同窗能夠參考一下,共同窗習進步.