>舒適提示:整個 Kafka Client 專欄基於 kafka-2.3.0 版本。java
根據 KafkaProducer 類上的註釋上來看 KafkaProducer 具備以下特徵:算法
KafkaProducer 是線程安全的,能夠被多個線程交叉使用。apache
KafkaProducer 內部包含一個緩存池,存放待發送消息,即 ProducerRecord 隊列,與此同時會開啓一個IO線程將 ProducerRecord 對象發送到 Kafka 集羣。bootstrap
KafkaProducer 的消息發送 API send 方法是異步,只負責將待發送消息 ProducerRecord 發送到緩存區中,當即返回,並返回一個結果憑證 Future。api
acks KafkaProducer 提供了一個核心參數 acks 用來定義消息「已提交」的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值以下:緩存
retries kafka 在生產端提供的另一個核心屬性,用來控制消息在發送失敗後的重試次數,設置爲 0 表示不重試,重試就有可能形成消息在發送端的重複。安全
batch.size kafka 消息發送者爲每個分區維護一個未發送消息積壓緩存區,其內存大小由batch.size指定,默認爲 16K。 但若是緩存區中不足100條,但發送線程此時空閒,是須要等到緩存區中積滿100條才能發送仍是能夠當即發送呢?默認是當即發送,即 batch.size 的做用實際上是客戶端一次發送到broker的最大消息數量。架構
linger.ms 爲了提升 kafka 消息發送的高吞吐量,即控制在緩存區中未積滿 batch.size 時來控制 消息發送線程的行爲,是當即發送仍是等待必定時間,若是linger.ms 設置爲 0表示當即發送,若是設置爲大於0,則消息發送線程會等待這個值後纔會向broker發送。該參數者會增長響應時間,但有利於增長吞吐量。有點相似於 TCP 領域的 Nagle 算法。併發
buffer.memory 用於控制消息發送者緩存的總內存大小,若是超過該值,往緩存區中添加消息會被阻塞,具體會在下文的消息發送流程中詳細介紹,阻塞的最大時間可經過參數 max.block.ms 設置,阻塞超過該值會拋出超時異常。app
key.serializer 指定 key 的序列化處理器。
value.serializer 指定 消息體的序列化處理器。
enable.idempotence 從 kafka0.11版本開始,支持消息傳遞冪等,能夠作到消息只會被傳遞一次,經過 enable.idempotence 爲 true 來開啓。若是該值設置爲 true,其 retries 將設置爲 Integer.MAX_VALUE,acks 將被設置爲 all。爲了確保消息發送冪等性,必須避免應用程序端的任何重試,而且若是消息發送API若是返回錯誤,應用端應該記錄最後成功發送的消息,避免消息的重複發送。
從Kafka 0.11開始,kafka 也支持事務消息。
在 Kafka 中,生產者經過接口 Producer 定義,經過該接口的方法,咱們基本能夠得知 KafkaProducer 將具有以下基本能力:
上面的方法咱們會根據須要在後續文章中進行詳細的介紹。接下來咱們看一下 KafkaProducer 的核心屬性的含義。
通過上面的梳理,詳細讀者朋友對 KafkaProducer 消息生產者有了一個大概的認識,下一篇會重點介紹消息發送流程。接下來咱們以一個簡單的示例結束本文的學習。
package persistent.prestige.demo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; public class KafkaProducerTest { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<string, string> producer = new KafkaProducer<>(props); try { for (int i = 0; i < 100; i++) { Future<recordmetadata> future = producer.send(new ProducerRecord<string, string>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i))); RecordMetadata recordMetadata = future.get(); System.out.printf("offset:" + recordMetadata.offset()); } } catch (Throwable e) { e.printStackTrace(); } finally { producer.close(); } } }
本文就介紹到這裏,其主要的目的是瞭解Kafka 的 Producer,引出後續須要學習的內容,下一篇將重點講述 Kafka 消息的發送流程,敬請關注。
若是本文對你們有所幫助的話,麻煩幫忙點個贊,謝謝。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。
</string,></recordmetadata></string,></k,></k,></k,>