寫文章 Kafka 探險 - 生產者源碼分析: 核心組件

前言

咱們說 Kafka 是一個消息隊列,其實更加確切的說:是 Broker 這個核心部件。爲什麼這麼說?你會發現咱們能夠經過控制檯、 Java 代碼、 C++ 代碼、甚至是 Socket 向 Broker 寫入消息,只要咱們聽從了 Kafka 寫入消息的協議,就能夠將消息發送到 Kafka 隊列中。數組

用專業一點的話術來講,Kafka 定義了一個應用層的網絡協議,只要咱們基於傳輸層構造出符 合這個協議的數據,就是合法的 Kafka 消息。緩存

dfe1f696c4167d5584cef38c3c75fa16.jpeg

因此說咱們寫入 Kafka 消息的只是一個生產者的客戶端,他的形式多種多樣,有 Java ,Python,C++ 等多種實現,那麼咱們每次發消息難道還須要本身去實現這套發送消息的協議麼?顯然 Kafka 官方已經考慮到這個問題了,爲了給咱們提供 開箱即用 的消息隊列,官方已經幫咱們寫好了各類語言的優質生產者實現,例如咱們今天要討論的 Java 版本的實現。網絡

思考

前面提到 Kafka 幫咱們實現了各個版本的生產者代碼,其實他也能夠徹底不提供這份代碼,由於核心的隊列的功能已經實現了,這些客戶端的代碼也能夠徹底交由用戶本身實現。架構

那麼假如沒有官方代碼,咱們又該實現一些什麼功能,有哪些接口,哪些方法,以及如何組織這些代碼呢。帶着這樣的問題咱們一塊兒來思考一下!通常對於這種帶有數據流轉的設計,我會從 由誰產生? 什麼數據? 通往哪去? 如何保證通路可靠? 這幾個方面來考慮。app

消息天然是經過應用程序構造出來並提供給生產者,生產者首先要知道須要將消息發送到哪一個 Broker 的哪一個 Topic,以及 Topic 的具體 Partition 。那麼必然須要配置客戶端的 Broker集羣地址 ,須要發送的 Topic 名稱 ,以及 消息的分區策略 ,是指定到具體的分區仍是經過某個 key hash 到不一樣的分區。框架

知道了消息要通往哪,還須要知道發送的是什麼格式的消息,是字符串仍是數字或是被序列化的二進制對象。 消息序列化 將須要消息序列化成字節數組才方便在網絡上傳輸,因此要配置生產者的消息序列化策略,最好是能夠經過傳遞枚舉或者類名的方式自動構造序列化器,便於後續序列化過程的擴展。異步

從上面一篇文章 《Kafka 探險 - 架構簡介》 瞭解到:消息隊列經常用於多個系統之間的異步調用,那麼這種調用關係就沒有強實時依賴。因爲發消息到 Kafka 會產生 網絡 I/O ,相對來講比較耗時,那麼消息發送這一動做除了同步調用, 是否也能夠設置爲異步,提升生產者的吞吐呢? 。而且大量消息發送場景, 咱們能夠設置一個窗口,窗口能夠是時間維度也能夠是消息數量維度,將消息積攢起來批次發送,減小網絡 I/O 次數,提升吞吐量。ide

最後呢爲了保證消息能夠最大程度的成功發送到 Broker ,咱們還須要一些 失敗重試機制 ,例如失敗後放到重試隊列中,隔一段時間嘗試再次發送。學習

理清思路

經過上面的分析,咱們會有一個大體的認識,應該會有哪些方法,以及底層的大體的設計會分爲哪幾個部分。可是不夠清楚,不夠明晰。優化


首先總結一下實現客戶端的幾個要點在於:

  1. 配置 Broker 基礎信息:集羣地址、Topic、Partition
  2. 消息序列化,經過可擴展的序列化器實現
  3. 消息異步寫入緩衝區,網絡 I/O 線程實現消息發送
  4. 消息發送的失敗重試機制


話很少說,用一張圖畫出各個核心模塊以及他們之間的交互順序:

76e9e534abb463afa9cc146c15c3f00b.jpeg


用戶設定 Kafka 集羣信息,生產者從 Kafka Broker 上拉取 可用 Kafka 節點、Topic 以及 Partition 對應關係。緩存到生產者成員變量中,若是 Broker 集羣有擴容,或者有機器下線須要從新獲取這些服務信息。

客戶端根據用戶設置的序列化器,對消息進行序列化,以後異步的將消息寫入到客戶端緩衝區。緩衝區內的消息到達必定的數量或者到達一個時間窗口後,網絡 I/O 線程將消息從緩衝區取走,發送到 Broker 。

以上就是我對於一個 Kafka 生產者實現的思考,接下來看看官方的代碼設計與咱們的思路有何差異,他又是爲何這麼設計。


官方設計

其實通過上面的思考和整理,咱們的設計已經很是接近 Kafka 的官方設計了,官方的模塊拆分的更加細緻,功能更加獨立。

核心組件

首先看一眼 KafkaProducer 類中有哪些成員變量,這些變量就是 Producer 的核心組件。


e9ad6609a6cf45862d9e83f824ab79ae.jpeg


其中核心字段的解釋以下:

clinetId :標識發送者Id

metric :統計指標

partitioner :分區器做用是決定消息發到哪一個分區。有 key 則按照 key 的 hash ,不然使用 roundrobin

key/value Serializer :消息 key/value 序列化器

interceptors :發送以前/後對消息的統一處理

maxRequestSize :能夠發送的最大消息,默認值是1M,即影響一個消息 Record 的大小,此值在服務端也是有限制的。

maxBlockTimeMs :buffer滿了或者等待metadata信息的,超時的補償機制

accumulator :累積緩衝器

networkClient :包裝的網絡層

sender :網絡 I/O 線程

發送流程


發送一條消息的時候,數據又是怎樣在這些組件之間進行流轉的呢?


cfb232fed72c100fa6a637b811bcb626.jpeg


Producer調用 send 方法後,在從 Broker 獲取的 Metadata 有效狀況下,通過攔截器和序列化後,被分區器放到了一個緩衝區的特定位置,緩衝區由一個 ConcurrentHashMap 構成,key 爲主題分區,value 是一個 deque 存放消息緩存塊。從客戶端角度來看若是無需關心發送結果,發送流程就已經結束了。

接下來是獨立的Sender線程負責從緩衝中獲取足量的數據調用 Network Client 封裝層去真正發送數據,這裏使用了 Java8 的 NIO 網絡模型發送數據。

能夠看到整個邏輯的關鍵點在於 RecordAccumulator 如何進行消息緩存,通常的成熟框架和中間件中都會有一套本身的內存管理機制,好比 Netty 也有一套複雜而又精妙的內存管理抽象層,這裏的緩衝區也是同樣的道理,主要須要去看看 Kafka 如何去作內存管理。

另外須要關注 Sender 從緩衝裏以什麼樣的邏輯獲取數據,來達到儘可能少的網絡交互發送儘可能多的數據。還有網絡失敗又是如何保證數據的可靠性的。這個地方也是咱們的設計和官方實現的差距,對於網絡 I/O 的精心優化。

目前的篇幅已經比較長了,爲了你們方便閱讀理解,本篇主要從和你們一塊兒思考如何設計一個 Kafka Producer 以及官方是如何實現的,咱們之間的差距是什麼,更須要關注的點是什麼。經過本身的思考和對比更加能認識到不足學習到新的點!

尾聲(嘮叨)


這篇文章從周內就開始了,後面斷斷續續天天寫了點,只是天天回去的確實有點晚,偶爾還給我整個失眠,精神狀態不太好,週五六點多飯都沒吃直接回家睡覺了,確實好睏,但願下週能休息好。

這周的工做壓力也很大,主要是須要推進不少上下游協同,還須要定方案。常常在想怎麼交涉?怎麼修改方案你們會認同?怎樣說服他們? 是壓力也是鍛鍊,說明這方面欠缺的較多,該補!

下篇文章主要會寫 KafkaProducer 的緩存內存管理機制,Meta 信息更新機制,以及網絡 I/O 模型的設計。敬請期待~

相關文章
相關標籤/搜索