在上文 設計一個百萬級的消息推送系統 中提到消息流轉採用的是 Kafka
做爲中間件。java
其中有朋友諮詢在大量消息的狀況下 Kakfa
是如何保證消息的高效及一致性呢?算法
正好以這個問題結合 Kakfa
的源碼討論下如何正確、高效的發送消息。apache
內容較多,對源碼感興趣的朋友請繫好安全帶😏(源碼基於
v0.10.0.0
版本分析)。同時最好是有必定的 Kafka 使用經驗,知曉基本的用法。
在分析以前先看一個簡單的消息發送是怎麼樣的。bootstrap
如下代碼基於 SpringBoot 構建。
首先建立一個 org.apache.kafka.clients.producer.Producer
的 bean。緩存
主要關注 bootstrap.servers
,它是必填參數。指的是 Kafka 集羣中的 broker 地址,例如 127.0.0.1:9094
。安全
其他幾個參數暫時不作討論,後文會有詳細介紹。
接着注入這個 bean 便可調用它的發送函數發送消息。網絡
這裏我給某一個 Topic 發送了 10W 條數據,運行程序消息正常發送。併發
但這僅僅只是作到了消息發送,對消息是否成功送達徹底沒管,等因而純異步
的方式。app
那麼我想知道消息到底發送成功沒有該怎麼辦呢?異步
其實 Producer
的 API
已經幫咱們考慮到了,發送以後只須要調用它的 get()
方法便可同步獲取發送結果。
發送結果:
這樣的發送效率實際上是比較低下的,由於每次都須要同步等待消息發送的結果。
爲此咱們應當採起異步的方式發送,其實 send()
方法默認則是異步的,只要不手動調用 get()
方法。
但這樣就無法獲知發送結果。
因此查看 send()
的 API 能夠發現還有一個參數。
Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);
Callback
是一個回調接口,在消息發送完成以後能夠回調咱們自定義的實現。
執行以後的結果:
一樣的也能獲取結果,同時發現回調的線程並非上文同步時的主線程
,這樣也能證實是異步回調的。
同時回調的時候會傳遞兩個參數:
RecordMetadata
和上文一致的消息發送成功後的元數據。Exception
消息發送過程當中的異常信息。可是這兩個參數並不會同時都有數據,只有發送失敗纔會有異常信息,同時發送元數據爲空。
因此正確的寫法應當是:
至於爲何會只有參數一個有值,在下文的源碼分析中會一一解釋。
如今只掌握了基本的消息發送,想要深入的理解發送中的一些參數配置仍是得源碼說了算。
首先仍是來談談消息發送時的整個流程是怎麼樣的,Kafka
並非簡單的把消息經過網絡發送到了 broker
中,在 Java 內部仍是通過了許多優化和設計。
爲了直觀的瞭解發送的流程,簡單的畫了幾個在發送過程當中關鍵的步驟。
從上至下依次是:
kafka-producer-network-thread
IO 線程。接下來詳解每一個步驟。
調用該構造方法進行初始化時,不止是簡單的將基本參數寫入 KafkaProducer
。比較麻煩的是初始化 Sender
線程進行緩衝區消費。
初始化 IO 線程處:
能夠看到 Sender 線程有須要成員變量,好比:
acks,retries,requestTimeout
等,這些參數會在後文分析。
在調用 send()
函數後其實第一步就是序列化,畢竟咱們的消息須要經過網絡才能發送到 Kafka。
其中的 valueSerializer.serialize(record.topic(), record.value());
是一個接口,咱們須要在初始化時候指定序列化實現類。
咱們也能夠本身實現序列化,只須要實現 org.apache.kafka.common.serialization.Serializer
接口便可。
接下來就是路由分區,一般咱們使用的 Topic
爲了實現擴展性以及高性能都會建立多個分區。
若是是一個分區好說,全部消息都往裏面寫入便可。
但多個分區就不可避免須要知道寫入哪一個分區。
一般有三種方式。
能夠在構建 ProducerRecord
爲每條消息指定分區。
這樣在路由時會判斷是否有指定,有就直接使用該分區。
這種通常在特殊場景下會使用。
若是沒有指定分區,則會調用 partitioner.partition
接口執行自定義分區策略。
而咱們也只須要自定義一個類實現 org.apache.kafka.clients.producer.Partitioner
接口,同時在建立 KafkaProducer
實例時配置 partitioner.class
參數。
一般須要自定義分區通常是在想盡可能的保證消息的順序性。
或者是寫入某些特有的分區,由特別的消費者來進行處理等。
最後一種則是默認的路由策略,若是咱們啥都沒作就會執行該策略。
該策略也會使得消息分配的比較均勻。
來看看它的實現:
簡單的來講分爲如下幾步:
其實這就是很典型的輪詢算法,因此只要分區數不頻繁變更這種方式也會比較均勻。
在 send()
方法拿到分區後會調用一個 append()
函數:
該函數中會調用一個 getOrCreateDeque()
寫入到一個內部緩存中 batches
。
在最開始初始化的 IO 線程實際上是一個守護線程,它會一直消費這些數據。
經過圖中的幾個函數會獲取到以前寫入的數據。這塊內容能夠沒必要深究,但其中有個 completeBatch
方法卻很是關鍵。
調用該方法時候確定已是消息發送完畢了,因此會調用 batch.done()
來完成以前咱們在 send()
方法中定義的回調接口。
從這裏也能夠看出爲何以前說發送完成後元數據和異常信息只會出現一個。
發送流程講完了再來看看 Producer
中比較重要的幾個參數。
acks
是一個影響消息吞吐量的一個關鍵參數。
主要有 [all、-1, 0, 1]
這幾個選項,默認爲 1。
因爲 Kafka
不是採起的主備模式,而是採用相似於 Zookeeper 的主備模式。
前提是Topic
配置副本數量replica > 1
。
當 acks = all/-1
時:
意味着會確保全部的 follower 副本都完成數據的寫入纔會返回。
這樣能夠保證消息不會丟失!
但同時性能和吞吐量倒是最低的。
當 acks = 0
時:
producer 不會等待副本的任何響應,這樣最容易丟失消息但同時性能倒是最好的!
當 acks = 1
時:
這是一種折中的方案,它會等待副本 Leader 響應,但不會等到 follower 的響應。
一旦 Leader 掛掉消息就會丟失。但性能和消息安全性都獲得了必定的保證。
這個參數看名稱就知道是內部緩存區的大小限制,對他適當的調大能夠提升吞吐量。
但也不能極端,調太大會浪費內存。小了也發揮不了做用,也是一個典型的時間和空間的權衡。
上圖是幾個使用的體現。
retries
該參數主要是來作重試使用,當發生一些網絡抖動都會形成重試。
這個參數也就是限制重試次數。
但也有一些其餘問題。
消息重複
。這種只能是消費者進行冪等處理。若是消息量真的很是大,同時又須要儘快的將消息發送到 Kafka
。一個 producer
始終會收到緩存大小等影響。
那是否能夠建立多個 producer
來進行發送呢?
producer
,獲取的同時判斷是否達到最大上限,沒有就新建一個同時保存到內部的 List
中,保存時作好同步處理防止併發問題。這樣在大量、頻繁的消息發送場景中能夠提升發送效率減輕單個 producer
的壓力。
最後則是 Producer
的關閉,Producer 在使用過程當中消耗了很多資源(線程、內存、網絡等)所以須要顯式的關閉從而回收這些資源。
默認的 close()
方法和帶有超時時間的方法都是在必定的時間後強制關閉。
但在過時以前都會處理完剩餘的任務。
因此使用哪個得視狀況而定。
本文內容較多,從實例和源碼的角度分析了 Kafka 生產者。
但願看完的朋友能有收穫,同時也歡迎留言討論。
不出意外下期會討論 Kafka 消費者。
若是對你有幫助還請分享讓更多的人看到。
歡迎關注公衆號一塊兒交流: