從源碼分析如何優雅的使用 Kafka 生產者

前言

在上文 設計一個百萬級的消息推送系統 中提到消息流轉採用的是 Kafka 做爲中間件。java

其中有朋友諮詢在大量消息的狀況下 Kakfa 是如何保證消息的高效及一致性呢?算法

正好以這個問題結合 Kakfa 的源碼討論下如何正確、高效的發送消息。apache

內容較多,對源碼感興趣的朋友請繫好安全帶😏(源碼基於 v0.10.0.0 版本分析)。同時最好是有必定的 Kafka 使用經驗,知曉基本的用法。

簡單的消息發送

在分析以前先看一個簡單的消息發送是怎麼樣的。bootstrap

如下代碼基於 SpringBoot 構建。

首先建立一個 org.apache.kafka.clients.producer.Producer 的 bean。緩存

主要關注 bootstrap.servers,它是必填參數。指的是 Kafka 集羣中的 broker 地址,例如 127.0.0.1:9094安全

其他幾個參數暫時不作討論,後文會有詳細介紹。

接着注入這個 bean 便可調用它的發送函數發送消息。網絡

這裏我給某一個 Topic 發送了 10W 條數據,運行程序消息正常發送。併發

但這僅僅只是作到了消息發送,對消息是否成功送達徹底沒管,等因而純異步的方式。app

同步

那麼我想知道消息到底發送成功沒有該怎麼辦呢?異步

其實 ProducerAPI 已經幫咱們考慮到了,發送以後只須要調用它的 get() 方法便可同步獲取發送結果。

發送結果:

這樣的發送效率實際上是比較低下的,由於每次都須要同步等待消息發送的結果。

異步

爲此咱們應當採起異步的方式發送,其實 send() 方法默認則是異步的,只要不手動調用 get() 方法。

但這樣就無法獲知發送結果。

因此查看 send() 的 API 能夠發現還有一個參數。

Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);

Callback 是一個回調接口,在消息發送完成以後能夠回調咱們自定義的實現。

執行以後的結果:

一樣的也能獲取結果,同時發現回調的線程並非上文同步時的主線程,這樣也能證實是異步回調的。

同時回調的時候會傳遞兩個參數:

  • RecordMetadata 和上文一致的消息發送成功後的元數據。
  • Exception 消息發送過程當中的異常信息。

可是這兩個參數並不會同時都有數據,只有發送失敗纔會有異常信息,同時發送元數據爲空。

因此正確的寫法應當是:

至於爲何會只有參數一個有值,在下文的源碼分析中會一一解釋。

源碼分析

如今只掌握了基本的消息發送,想要深入的理解發送中的一些參數配置仍是得源碼說了算。

首先仍是來談談消息發送時的整個流程是怎麼樣的,Kafka 並非簡單的把消息經過網絡發送到了 broker 中,在 Java 內部仍是通過了許多優化和設計。

發送流程

爲了直觀的瞭解發送的流程,簡單的畫了幾個在發送過程當中關鍵的步驟。

從上至下依次是:

  • 初始化以及真正發送消息的 kafka-producer-network-thread IO 線程。
  • 將消息序列化。
  • 獲得須要發送的分區。
  • 寫入內部的一個緩存區中。
  • 初始化的 IO 線程不斷的消費這個緩存來發送消息。

步驟解析

接下來詳解每一個步驟。

初始化

調用該構造方法進行初始化時,不止是簡單的將基本參數寫入 KafkaProducer。比較麻煩的是初始化 Sender 線程進行緩衝區消費。

初始化 IO 線程處:

能夠看到 Sender 線程有須要成員變量,好比:

acks,retries,requestTimeout

等,這些參數會在後文分析。

序列化消息

在調用 send() 函數後其實第一步就是序列化,畢竟咱們的消息須要經過網絡才能發送到 Kafka。

其中的 valueSerializer.serialize(record.topic(), record.value()); 是一個接口,咱們須要在初始化時候指定序列化實現類。

咱們也能夠本身實現序列化,只須要實現 org.apache.kafka.common.serialization.Serializer 接口便可。

路由分區

接下來就是路由分區,一般咱們使用的 Topic 爲了實現擴展性以及高性能都會建立多個分區。

若是是一個分區好說,全部消息都往裏面寫入便可。

但多個分區就不可避免須要知道寫入哪一個分區。

一般有三種方式。

指定分區

能夠在構建 ProducerRecord 爲每條消息指定分區。

這樣在路由時會判斷是否有指定,有就直接使用該分區。

這種通常在特殊場景下會使用。

自定義路由策略

若是沒有指定分區,則會調用 partitioner.partition 接口執行自定義分區策略。

而咱們也只須要自定義一個類實現 org.apache.kafka.clients.producer.Partitioner 接口,同時在建立 KafkaProducer 實例時配置 partitioner.class 參數。

一般須要自定義分區通常是在想盡可能的保證消息的順序性。

或者是寫入某些特有的分區,由特別的消費者來進行處理等。

默認策略

最後一種則是默認的路由策略,若是咱們啥都沒作就會執行該策略。

該策略也會使得消息分配的比較均勻。

來看看它的實現:

簡單的來講分爲如下幾步:

  • 獲取 Topic 分區數。
  • 將內部維護的一個線程安全計數器 +1。
  • 與分區數取模獲得分區編號。

其實這就是很典型的輪詢算法,因此只要分區數不頻繁變更這種方式也會比較均勻。

寫入內部緩存

send() 方法拿到分區後會調用一個 append() 函數:

該函數中會調用一個 getOrCreateDeque() 寫入到一個內部緩存中 batches

消費緩存

在最開始初始化的 IO 線程實際上是一個守護線程,它會一直消費這些數據。

經過圖中的幾個函數會獲取到以前寫入的數據。這塊內容能夠沒必要深究,但其中有個 completeBatch 方法卻很是關鍵。

調用該方法時候確定已是消息發送完畢了,因此會調用 batch.done() 來完成以前咱們在 send() 方法中定義的回調接口。

從這裏也能夠看出爲何以前說發送完成後元數據和異常信息只會出現一個。

Producer 參數解析

發送流程講完了再來看看 Producer 中比較重要的幾個參數。

acks

acks 是一個影響消息吞吐量的一個關鍵參數。

主要有 [all、-1, 0, 1] 這幾個選項,默認爲 1。

因爲 Kafka 不是採起的主備模式,而是採用相似於 Zookeeper 的主備模式。

前提是 Topic 配置副本數量 replica > 1

acks = all/-1 時:

意味着會確保全部的 follower 副本都完成數據的寫入纔會返回。

這樣能夠保證消息不會丟失!

但同時性能和吞吐量倒是最低的。

acks = 0 時:

producer 不會等待副本的任何響應,這樣最容易丟失消息但同時性能倒是最好的!

acks = 1 時:

這是一種折中的方案,它會等待副本 Leader 響應,但不會等到 follower 的響應。

一旦 Leader 掛掉消息就會丟失。但性能和消息安全性都獲得了必定的保證。

batch.size

這個參數看名稱就知道是內部緩存區的大小限制,對他適當的調大能夠提升吞吐量。

但也不能極端,調太大會浪費內存。小了也發揮不了做用,也是一個典型的時間和空間的權衡。

上圖是幾個使用的體現。

retries

retries 該參數主要是來作重試使用,當發生一些網絡抖動都會形成重試。

這個參數也就是限制重試次數。

但也有一些其餘問題。

  • 由於是重發因此消息順序可能不會一致,這也是上文提到就算是一個分區消息也不會是徹底順序的狀況。
  • 仍是因爲網絡問題,原本消息已經成功寫入了可是沒有成功響應給 producer,進行重試時就可能會出現消息重複。這種只能是消費者進行冪等處理。

高效的發送方式

若是消息量真的很是大,同時又須要儘快的將消息發送到 Kafka。一個 producer 始終會收到緩存大小等影響。

那是否能夠建立多個 producer 來進行發送呢?

  • 配置一個最大 producer 個數。
  • 發送消息時首先獲取一個 producer,獲取的同時判斷是否達到最大上限,沒有就新建一個同時保存到內部的 List 中,保存時作好同步處理防止併發問題。
  • 獲取發送者時能夠按照默認的分區策略使用輪詢的方式獲取(保證使用均勻)。

這樣在大量、頻繁的消息發送場景中能夠提升發送效率減輕單個 producer 的壓力。

關閉 Producer

最後則是 Producer 的關閉,Producer 在使用過程當中消耗了很多資源(線程、內存、網絡等)所以須要顯式的關閉從而回收這些資源。

默認的 close() 方法和帶有超時時間的方法都是在必定的時間後強制關閉。

但在過時以前都會處理完剩餘的任務。

因此使用哪個得視狀況而定。

總結

本文內容較多,從實例和源碼的角度分析了 Kafka 生產者。

但願看完的朋友能有收穫,同時也歡迎留言討論。

不出意外下期會討論 Kafka 消費者。

若是對你有幫助還請分享讓更多的人看到。

歡迎關注公衆號一塊兒交流:

相關文章
相關標籤/搜索