舒適提示:整個 Kafka Client 專欄基於 kafka-2.3.0 版本。算法
@(本節目錄)apache
一、KafkaProducer 概述
根據 KafkaProducer 類上的註釋上來看 KafkaProducer 具備以下特徵:bootstrap
- KafkaProducer 是線程安全的,能夠被多個線程交叉使用。
- KafkaProducer 內部包含一個緩存池,存放待發送消息,即 ProducerRecord 隊列,與此同時會開啓一個IO線程將 ProducerRecord 對象發送到 Kafka 集羣。
- KafkaProducer 的消息發送 API send 方法是異步,只負責將待發送消息 ProducerRecord 發送到緩存區中,當即返回,並返回一個結果憑證 Future。
- acks
KafkaProducer 提供了一個核心參數 acks 用來定義消息「已提交」的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值以下:
- 0
表示生產者不關係該條消息在 broker 端的處理結果,只要調用 KafkaProducer 的 send 方法返回後即認爲成功,顯然這種方式是最不安全的,由於 Broker 端可能壓根都沒有收到該條消息或存儲失敗。
- all 或 -1
表示消息不只須要 Leader 節點已存儲該消息,而且要求其副本(準確的來講是 ISR 中的節點)所有存儲才認爲已提交,才向客戶端返回提交成功。這是最嚴格的持久化保障,固然性能也最低。
- 1
表示消息只須要寫入 Leader 節點後就能夠向客戶端返回提交成功。
- retries
kafka 在生產端提供的另一個核心屬性,用來控制消息在發送失敗後的重試次數,設置爲 0 表示不重試,重試就有可能形成消息在發送端的重複。
- batch.size
kafka 消息發送者爲每個分區維護一個未發送消息積壓緩存區,其內存大小由batch.size指定,默認爲 16K。
但若是緩存區中不足100條,但發送線程此時空閒,是須要等到緩存區中積滿100條才能發送仍是能夠當即發送呢?默認是當即發送,即 batch.size 的做用實際上是客戶端一次發送到broker的最大消息數量。
- linger.ms
爲了提升 kafka 消息發送的高吞吐量,即控制在緩存區中未積滿 batch.size 時來控制 消息發送線程的行爲,是當即發送仍是等待必定時間,若是linger.ms 設置爲 0表示當即發送,若是設置爲大於0,則消息發送線程會等待這個值後纔會向broker發送。該參數者會增長響應時間,但有利於增長吞吐量。有點相似於 TCP 領域的 Nagle 算法。
- buffer.memory
用於控制消息發送者緩存的總內存大小,若是超過該值,往緩存區中添加消息會被阻塞,具體會在下文的消息發送流程中詳細介紹,阻塞的最大時間可經過參數 max.block.ms 設置,阻塞超過該值會拋出超時異常。
- key.serializer
指定 key 的序列化處理器。
- value.serializer
指定 消息體的序列化處理器。
- enable.idempotence
從 kafka0.11版本開始,支持消息傳遞冪等,能夠作到消息只會被傳遞一次,經過 enable.idempotence 爲 true 來開啓。若是該值設置爲 true,其 retries 將設置爲 Integer.MAX_VALUE,acks 將被設置爲 all。爲了確保消息發送冪等性,必須避免應用程序端的任何重試,而且若是消息發送API若是返回錯誤,應用端應該記錄最後成功發送的消息,避免消息的重複發送。
從Kafka 0.11開始,kafka 也支持事務消息。api
二、KafkaProducer 類圖
![在這裏插入圖片描述](http://static.javashuo.com/static/loading.gif)
在 Kafka 中,生產者經過接口 Producer 定義,經過該接口的方法,咱們基本能夠得知 KafkaProducer 將具有以下基本能力:緩存
- void initTransactions()
初始化事務,若是須要使用事務方法,該方法必須首先被調用。
- void beginTransaction()
開啓事務。
- void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
向消費組提交當前事務中的消息偏移量,將在介紹 Kafka 事務相關文章中詳細介紹。
- void commitTransaction()
提交事務。
- void abortTransaction()
回滾事務。
- Future< RecordMetadata> send(ProducerRecord<K, V> record)
消息發送,該方法默認爲異步發送,若是要實現同步發送的效果,對返回結果調用 get 方法便可,該方法將在下篇文章中詳細介紹。
- Future< RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
消息發送,支持回調。
- void flush()
忽略 linger.ms 的值,直接喚醒發送線程,將緩衝區中的消息所有發送到 broker。
- List< PartitionInfo> partitionsFor(String topic)
獲取 topic 的路由信息(分區信息)。
- Map< MetricName, ? extends Metric> metrics()
獲取由生產者收集的統計信息。
- void close()
關閉發送者。
- void close(Duration timeout)
定時關閉消息發送者。
上面的方法咱們會根據須要在後續文章中進行詳細的介紹。接下來咱們看一下 KafkaProducer 的核心屬性的含義。安全
- String clientId
客戶端ID。在建立 KafkaProducer 時可經過 client.id 定義 clientId,若是未指定,則默認 producer- seq,seq 在進程內遞增,強烈建議客戶端顯示指定 clientId。
- Metrics metrics
度量的相關存儲容器,例如消息體大小、發送耗時等與監控相關的指標。
- Partitioner partitioner
分區負載均衡算法,經過參數 partitioner.class 指定。
- int maxRequestSize
調用 send 方法發送的最大請求大小,包括 key、消息體序列化後的消息總大小不能超過該值。經過參數 max.request.size 來設置。
- long totalMemorySize
生產者緩存所佔內存的總大小,經過參數 buffer.memory 設置。
- Metadata metadata
元數據信息,例如 topic 的路由信息,由 KafkaProducer 自動更新。
- RecordAccumulator accumulator
消息記錄累積器,將在消息發送部分詳細介紹。
- Sender sender
用於封裝消息發送的邏輯,即向 broker 發送消息的處理邏輯。
- Thread ioThread
用於消息發送的後臺線程,一個獨立的線程,內部使用 Sender 來向 broker 發送消息。
- CompressionType compressionType
壓縮類型,默認不啓用壓縮,可經過參數 compression.type 配置。可選值:none、gzip、snappy、lz四、zstd。
- Sensor errors
錯誤信息收集器,當成一個 metrics,用來作監控的。
- Time time
用於獲取系統時間或線程睡眠等。
- Serializer< K> keySerializer
用於對消息的 key 進行序列化。
- Serializer< V> valueSerializer
對消息體進行序列化。
- ProducerConfig producerConfig
生產者的配置信息。
- long maxBlockTimeMs
最大阻塞時間,當生產者使用的緩存已經達到規定值後,此時消息發送會阻塞,經過參數 max.block.ms 來設置最多等待多久。
- ProducerInterceptors<K, V> interceptors
生產者端的攔截器,在消息發送以前進行一些定製化處理。
- ApiVersions apiVersions
維護 api 版本的相關元信息,該類只能在 kafka 內部使用。
- TransactionManager transactionManager
kafka 消息事務管理器。
- TransactionalRequestResult initTransactionsResult
kafka 生產者事務上下文環境初始結果。
通過上面的梳理,詳細讀者朋友對 KafkaProducer 消息生產者有了一個大概的認識,下一篇會重點介紹消息發送流程。接下來咱們以一個簡單的示例結束本文的學習。併發
三、KafkaProducer 簡單示例
package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 100; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
RecordMetadata recordMetadata = future.get();
System.out.printf("offset:" + recordMetadata.offset());
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
本文就介紹到這裏,其主要的目的是瞭解Kafka 的 Producer,引出後續須要學習的內容,下一篇將重點講述 Kafka 消息的發送流程,敬請關注。app
若是本文對你們有所幫助的話,麻煩幫忙點個贊,謝謝。負載均衡
做者介紹:
丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。歡迎加入個人知識星球,構建一個高質量的技術交流社羣。
![在這裏插入圖片描述](http://static.javashuo.com/static/loading.gif)