初識 Kafka Producer 生產者

>舒適提示:整個 Kafka Client 專欄基於 kafka-2.3.0 版本。java

一、KafkaProducer 概述

根據 KafkaProducer 類上的註釋上來看 KafkaProducer 具備以下特徵:算法

  • KafkaProducer 是線程安全的,能夠被多個線程交叉使用。apache

  • KafkaProducer 內部包含一個緩存池,存放待發送消息,即 ProducerRecord 隊列,與此同時會開啓一個IO線程將 ProducerRecord 對象發送到 Kafka 集羣。bootstrap

  • KafkaProducer 的消息發送 API send 方法是異步,只負責將待發送消息 ProducerRecord 發送到緩存區中,當即返回,並返回一個結果憑證 Future。api

  • acks KafkaProducer 提供了一個核心參數 acks 用來定義消息「已提交」的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值以下:緩存

    • 0 表示生產者不關係該條消息在 broker 端的處理結果,只要調用 KafkaProducer 的 send 方法返回後即認爲成功,顯然這種方式是最不安全的,由於 Broker 端可能壓根都沒有收到該條消息或存儲失敗。
    • all 或 -1 表示消息不只須要 Leader 節點已存儲該消息,而且要求其副本(準確的來講是 ISR 中的節點)所有存儲才認爲已提交,才向客戶端返回提交成功。這是最嚴格的持久化保障,固然性能也最低。
    • 1 表示消息只須要寫入 Leader 節點後就能夠向客戶端返回提交成功。
  • retries kafka 在生產端提供的另一個核心屬性,用來控制消息在發送失敗後的重試次數,設置爲 0 表示不重試,重試就有可能形成消息在發送端的重複。安全

  • batch.size kafka 消息發送者爲每個分區維護一個未發送消息積壓緩存區,其內存大小由batch.size指定,默認爲 16K。 但若是緩存區中不足100條,但發送線程此時空閒,是須要等到緩存區中積滿100條才能發送仍是能夠當即發送呢?默認是當即發送,即 batch.size 的做用實際上是客戶端一次發送到broker的最大消息數量。架構

  • linger.ms 爲了提升 kafka 消息發送的高吞吐量,即控制在緩存區中未積滿 batch.size 時來控制 消息發送線程的行爲,是當即發送仍是等待必定時間,若是linger.ms 設置爲 0表示當即發送,若是設置爲大於0,則消息發送線程會等待這個值後纔會向broker發送。該參數者會增長響應時間,但有利於增長吞吐量。有點相似於 TCP 領域的 Nagle 算法。併發

  • buffer.memory 用於控制消息發送者緩存的總內存大小,若是超過該值,往緩存區中添加消息會被阻塞,具體會在下文的消息發送流程中詳細介紹,阻塞的最大時間可經過參數 max.block.ms 設置,阻塞超過該值會拋出超時異常。app

  • key.serializer 指定 key 的序列化處理器。

  • value.serializer 指定 消息體的序列化處理器。

  • enable.idempotence 從 kafka0.11版本開始,支持消息傳遞冪等,能夠作到消息只會被傳遞一次,經過 enable.idempotence 爲 true 來開啓。若是該值設置爲 true,其 retries 將設置爲 Integer.MAX_VALUE,acks 將被設置爲 all。爲了確保消息發送冪等性,必須避免應用程序端的任何重試,而且若是消息發送API若是返回錯誤,應用端應該記錄最後成功發送的消息,避免消息的重複發送。

從Kafka 0.11開始,kafka 也支持事務消息。

二、KafkaProducer 類圖

在這裏插入圖片描述

在 Kafka 中,生產者經過接口 Producer 定義,經過該接口的方法,咱們基本能夠得知 KafkaProducer 將具有以下基本能力:

  • void initTransactions() 初始化事務,若是須要使用事務方法,該方法必須首先被調用。
  • void beginTransaction() 開啓事務。
  • void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) 向消費組提交當前事務中的消息偏移量,將在介紹 Kafka 事務相關文章中詳細介紹。
  • void commitTransaction() 提交事務。
  • void abortTransaction() 回滾事務。
  • Future< RecordMetadata> send(ProducerRecord<k, v> record) 消息發送,該方法默認爲異步發送,若是要實現同步發送的效果,對返回結果調用 get 方法便可,該方法將在下篇文章中詳細介紹。
  • Future< RecordMetadata> send(ProducerRecord<k, v> record, Callback callback) 消息發送,支持回調。
  • void flush() 忽略 linger.ms 的值,直接喚醒發送線程,將緩衝區中的消息所有發送到 broker。
  • List< PartitionInfo> partitionsFor(String topic) 獲取 topic 的路由信息(分區信息)。
  • Map< MetricName, ? extends Metric> metrics() 獲取由生產者收集的統計信息。
  • void close() 關閉發送者。
  • void close(Duration timeout) 定時關閉消息發送者。

上面的方法咱們會根據須要在後續文章中進行詳細的介紹。接下來咱們看一下 KafkaProducer 的核心屬性的含義。

  • String clientId 客戶端ID。在建立 KafkaProducer 時可經過 client.id 定義 clientId,若是未指定,則默認 producer- seq,seq 在進程內遞增,強烈建議客戶端顯示指定 clientId。
  • Metrics metrics 度量的相關存儲容器,例如消息體大小、發送耗時等與監控相關的指標。
  • Partitioner partitioner 分區負載均衡算法,經過參數 partitioner.class 指定。
  • int maxRequestSize 調用 send 方法發送的最大請求大小,包括 key、消息體序列化後的消息總大小不能超過該值。經過參數 max.request.size 來設置。
  • long totalMemorySize 生產者緩存所佔內存的總大小,經過參數 buffer.memory 設置。
  • Metadata metadata 元數據信息,例如 topic 的路由信息,由 KafkaProducer 自動更新。
  • RecordAccumulator accumulator 消息記錄累積器,將在消息發送部分詳細介紹。
  • Sender sender 用於封裝消息發送的邏輯,即向 broker 發送消息的處理邏輯。
  • Thread ioThread 用於消息發送的後臺線程,一個獨立的線程,內部使用 Sender 來向 broker 發送消息。
  • CompressionType compressionType 壓縮類型,默認不啓用壓縮,可經過參數 compression.type 配置。可選值:none、gzip、snappy、lz四、zstd。
  • Sensor errors 錯誤信息收集器,當成一個 metrics,用來作監控的。
  • Time time 用於獲取系統時間或線程睡眠等。
  • Serializer< K> keySerializer 用於對消息的 key 進行序列化。
  • Serializer< V> valueSerializer 對消息體進行序列化。
  • ProducerConfig producerConfig 生產者的配置信息。
  • long maxBlockTimeMs 最大阻塞時間,當生產者使用的緩存已經達到規定值後,此時消息發送會阻塞,經過參數 max.block.ms 來設置最多等待多久。
  • ProducerInterceptors<k, v> interceptors 生產者端的攔截器,在消息發送以前進行一些定製化處理。
  • ApiVersions apiVersions 維護 api 版本的相關元信息,該類只能在 kafka 內部使用。
  • TransactionManager transactionManager kafka 消息事務管理器。
  • TransactionalRequestResult initTransactionsResult kafka 生產者事務上下文環境初始結果。

通過上面的梳理,詳細讀者朋友對 KafkaProducer 消息生產者有了一個大概的認識,下一篇會重點介紹消息發送流程。接下來咱們以一個簡單的示例結束本文的學習。

三、KafkaProducer 簡單示例

package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<string, string> producer = new KafkaProducer&lt;&gt;(props);
        try {
            for (int i = 0; i &lt; 100; i++) {
                Future<recordmetadata>  future = producer.send(new ProducerRecord<string, string>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
                RecordMetadata recordMetadata = future.get();
                System.out.printf("offset:" + recordMetadata.offset());
            }
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

本文就介紹到這裏,其主要的目的是瞭解Kafka 的 Producer,引出後續須要學習的內容,下一篇將重點講述 Kafka 消息的發送流程,敬請關注。

若是本文對你們有所幫助的話,麻煩幫忙點個贊,謝謝。


做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。

</string,></recordmetadata></string,></k,></k,></k,>

相關文章
相關標籤/搜索