咱們說 Kafka 是一個消息隊列,其實更加確切的說:是 Broker 這個核心部件。爲什麼這麼說?你會發現咱們能夠經過控制檯、 Java 代碼、 C++ 代碼、甚至是 Socket 向 Broker 寫入消息,只要咱們聽從了 Kafka 寫入消息的協議,就能夠將消息發送到 Kafka 隊列中。數組
用專業一點的話術來講,Kafka 定義了一個應用層的網絡協議,只要咱們基於傳輸層構造出符 合這個協議的數據,就是合法的 Kafka 消息。緩存
因此說咱們寫入 Kafka 消息的只是一個生產者的客戶端,他的形式多種多樣,有 Java ,Python,C++ 等多種實現,那麼咱們每次發消息難道還須要本身去實現這套發送消息的協議麼?顯然 Kafka 官方已經考慮到這個問題了,爲了給咱們提供 開箱即用
的消息隊列,官方已經幫咱們寫好了各類語言的優質生產者實現,例如咱們今天要討論的 Java 版本的實現。網絡
前面提到 Kafka 幫咱們實現了各個版本的生產者代碼,其實他也能夠徹底不提供這份代碼,由於核心的隊列的功能已經實現了,這些客戶端的代碼也能夠徹底交由用戶本身實現。架構
那麼假如沒有官方代碼,咱們又該實現一些什麼功能,有哪些接口,哪些方法,以及如何組織這些代碼呢。帶着這樣的問題咱們一塊兒來思考一下!通常對於這種帶有數據流轉的設計,我會從 由誰產生?
什麼數據?
通往哪去?
如何保證通路可靠?
這幾個方面來考慮。app
消息天然是經過應用程序構造出來並提供給生產者,生產者首先要知道須要將消息發送到哪一個 Broker 的哪一個 Topic,以及 Topic 的具體 Partition 。那麼必然須要配置客戶端的 Broker集羣地址
,須要發送的 Topic 名稱
,以及 消息的分區策略
,是指定到具體的分區仍是經過某個 key hash 到不一樣的分區。框架
知道了消息要通往哪,還須要知道發送的是什麼格式的消息,是字符串仍是數字或是被序列化的二進制對象。 消息序列化
將須要消息序列化成字節數組才方便在網絡上傳輸,因此要配置生產者的消息序列化策略,最好是能夠經過傳遞枚舉或者類名的方式自動構造序列化器,便於後續序列化過程的擴展。異步
從上面一篇文章 《Kafka 探險 - 架構簡介》 瞭解到:消息隊列經常用於多個系統之間的異步調用,那麼這種調用關係就沒有強實時依賴。因爲發消息到 Kafka 會產生 網絡 I/O
,相對來講比較耗時,那麼消息發送這一動做除了同步調用, 是否也能夠設置爲異步,提升生產者的吞吐呢?
。而且大量消息發送場景, 咱們能夠設置一個窗口,窗口能夠是時間維度也能夠是消息數量維度,將消息積攢起來批次發送,減小網絡 I/O 次數,提升吞吐量。ide
最後呢爲了保證消息能夠最大程度的成功發送到 Broker ,咱們還須要一些 失敗重試機制
,例如失敗後放到重試隊列中,隔一段時間嘗試再次發送。學習
經過上面的分析,咱們會有一個大體的認識,應該會有哪些方法,以及底層的大體的設計會分爲哪幾個部分。可是不夠清楚,不夠明晰。優化
首先總結一下實現客戶端的幾個要點在於:
話很少說,用一張圖畫出各個核心模塊以及他們之間的交互順序:
用戶設定 Kafka 集羣信息,生產者從 Kafka Broker 上拉取 可用 Kafka 節點、Topic 以及 Partition 對應關係。緩存到生產者成員變量中,若是 Broker 集羣有擴容,或者有機器下線須要從新獲取這些服務信息。
客戶端根據用戶設置的序列化器,對消息進行序列化,以後異步的將消息寫入到客戶端緩衝區。緩衝區內的消息到達必定的數量或者到達一個時間窗口後,網絡 I/O 線程將消息從緩衝區取走,發送到 Broker 。
以上就是我對於一個 Kafka 生產者實現的思考,接下來看看官方的代碼設計與咱們的思路有何差異,他又是爲何這麼設計。
其實通過上面的思考和整理,咱們的設計已經很是接近 Kafka 的官方設計了,官方的模塊拆分的更加細緻,功能更加獨立。
首先看一眼 KafkaProducer 類中有哪些成員變量,這些變量就是 Producer 的核心組件。
其中核心字段的解釋以下:
clinetId
:標識發送者Id
metric
:統計指標
partitioner
:分區器做用是決定消息發到哪一個分區。有 key 則按照 key 的 hash ,不然使用 roundrobin
key/value Serializer
:消息 key/value 序列化器
interceptors
:發送以前/後對消息的統一處理
maxRequestSize
:能夠發送的最大消息,默認值是1M,即影響一個消息 Record 的大小,此值在服務端也是有限制的。
maxBlockTimeMs
:buffer滿了或者等待metadata信息的,超時的補償機制
accumulator
:累積緩衝器
networkClient
:包裝的網絡層
sender
:網絡 I/O 線程
發送一條消息的時候,數據又是怎樣在這些組件之間進行流轉的呢?
Producer調用 send 方法後,在從 Broker 獲取的 Metadata 有效狀況下,通過攔截器和序列化後,被分區器放到了一個緩衝區的特定位置,緩衝區由一個 ConcurrentHashMap 構成,key 爲主題分區,value 是一個 deque 存放消息緩存塊。從客戶端角度來看若是無需關心發送結果,發送流程就已經結束了。
接下來是獨立的Sender線程負責從緩衝中獲取足量的數據調用 Network Client 封裝層去真正發送數據,這裏使用了 Java8 的 NIO 網絡模型發送數據。
能夠看到整個邏輯的關鍵點在於 RecordAccumulator 如何進行消息緩存,通常的成熟框架和中間件中都會有一套本身的內存管理機制,好比 Netty 也有一套複雜而又精妙的內存管理抽象層,這裏的緩衝區也是同樣的道理,主要須要去看看 Kafka 如何去作內存管理。
另外須要關注 Sender 從緩衝裏以什麼樣的邏輯獲取數據,來達到儘可能少的網絡交互發送儘可能多的數據。還有網絡失敗又是如何保證數據的可靠性的。這個地方也是咱們的設計和官方實現的差距,對於網絡 I/O 的精心優化。
目前的篇幅已經比較長了,爲了你們方便閱讀理解,本篇主要從和你們一塊兒思考如何設計一個 Kafka Producer 以及官方是如何實現的,咱們之間的差距是什麼,更須要關注的點是什麼。經過本身的思考和對比更加能認識到不足學習到新的點!
這篇文章從周內就開始了,後面斷斷續續天天寫了點,只是天天回去的確實有點晚,偶爾還給我整個失眠,精神狀態不太好,週五六點多飯都沒吃直接回家睡覺了,確實好睏,但願下週能休息好。
這周的工做壓力也很大,主要是須要推進不少上下游協同,還須要定方案。常常在想怎麼交涉?怎麼修改方案你們會認同?怎樣說服他們? 是壓力也是鍛鍊,說明這方面欠缺的較多,該補!
下篇文章主要會寫 KafkaProducer 的緩存內存管理機制,Meta 信息更新機制,以及網絡 I/O 模型的設計。敬請期待~