初識 Kafka Producer 生產者

一、KafkaProducer 概述

根據 KafkaProducer 類上的註釋上來看 KafkaProducer 具備以下特徵:java

  • KafkaProducer 是線程安全的,能夠被多個線程交叉使用。算法

  • KafkaProducer 內部包含一個緩存池,存放待發送消息,即 ProducerRecord 隊列,與此同時會開啓一個IO線程將 ProducerRecord 對象發送到 Kafka 集羣。apache

  • KafkaProducer 的消息發送 API send 方法是異步,只負責將待發送消息 ProducerRecord 發送到緩存區中,當即返回,並返回一個結果憑證 Future。bootstrap

  • acks
    KafkaProducer 提供了一個核心參數 acks 用來定義消息「已提交」的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值以下:api

    • 0
      表示生產者不關係該條消息在 broker 端的處理結果,只要調用 KafkaProducer 的 send 方法返回後即認爲成功,顯然這種方式是最不安全的,由於 Broker 端可能壓根都沒有收到該條消息或存儲失敗。緩存

    • all 或 -1
      表示消息不只須要 Leader 節點已存儲該消息,而且要求其副本(準確的來講是 ISR 中的節點)所有存儲才認爲已提交,才向客戶端返回提交成功。這是最嚴格的持久化保障,固然性能也最低。安全

    • 1
      表示消息只須要寫入 Leader 節點後就能夠向客戶端返回提交成功。app

  • retries
    kafka 在生產端提供的另一個核心屬性,用來控制消息在發送失敗後的重試次數,設置爲 0 表示不重試,重試就有可能形成消息在發送端的重複。負載均衡

  • batch.size
    kafka 消息發送者爲每個分區維護一個未發送消息積壓緩存區,其內存大小由batch.size指定,默認爲 16K。
    但若是緩存區中不足100條,但發送線程此時空閒,是須要等到緩存區中積滿100條才能發送仍是能夠當即發送呢?默認是當即發送,即 batch.size 的做用實際上是客戶端一次發送到broker的最大消息大小。異步

  • linger.ms
       爲了提升 kafka 消息發送的高吞吐量,即控制在緩存區中未積滿 batch.size  時來控制 消息發送線程的行爲,是當即發送仍是等待必定時間,若是linger.ms 設置爲 0表示當即發送,若是設置爲大於0,則消息發送線程會等待這個值後纔會向broker發送。該參數會增長響應時間,但有利於增長吞吐量。有點相似於 TCP 領域的 Nagle 算法。

  • buffer.memory
    用於控制消息發送者緩存的總內存大小,若是超過該值,往緩存區中添加消息會被阻塞,具體會在下文的消息發送流程中詳細介紹,阻塞的最大時間可經過參數 max.block.ms 設置,阻塞超過該值會拋出超時異常。

  • key.serializer
    指定 key 的序列化處理器。

  • value.serializer
    指定 消息體的序列化處理器。

  • enable.idempotence
    從 kafka0.11版本開始,支持消息傳遞冪等,能夠作到消息只會被傳遞一次,經過 enable.idempotence 爲 true 來開啓。若是該值設置爲 true,其 retries 將設置爲 Integer.MAX_VALUE,acks 將被設置爲 all。爲了確保消息發送冪等性,必須避免應用程序端的任何重試,而且若是消息發送API若是返回錯誤,應用端應該記錄最後成功發送的消息,避免消息的重複發送。

從Kafka 0.11開始,kafka 也支持事務消息。

二、KafkaProducer 類圖


在 Kafka  中,生產者經過接口 Producer 定義,經過該接口的方法,咱們基本能夠得知 KafkaProducer 將具有以下基本能力:

  • void initTransactions()
    初始化事務,若是須要使用事務方法,該方法必須首先被調用。

  • void beginTransaction()
    開啓事務。

  • void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
    向消費組提交當前事務中的消息偏移量,將在介紹 Kafka 事務相關文章中詳細介紹。

  • void commitTransaction()
    提交事務。

  • void abortTransaction()
    回滾事務。

  • Future< RecordMetadata> send(ProducerRecord

    record)
    消息發送,該方法默認爲異步發送,若是要實現同步發送的效果,對返回結果調用  get 方法便可,該方法將在下篇文章中詳細介紹。
  • Future< RecordMetadata> send(ProducerRecord

    record, Callback callback)
      消息發送,支持回調。
  • void flush()
    忽略 linger.ms 的值,直接喚醒發送線程,將緩衝區中的消息所有發送到 broker。

  • List< PartitionInfo> partitionsFor(String topic)
    獲取 topic 的路由信息(分區信息)。

  • Map< MetricName, ? extends Metric> metrics()
    獲取由生產者收集的統計信息。

  • void close()
    關閉發送者。

  • void close(Duration timeout)
    定時關閉消息發送者。

上面的方法咱們會根據須要在後續文章中進行詳細的介紹。接下來咱們看一下 KafkaProducer 的核心屬性的含義。

  • String clientId
    客戶端ID。在建立 KafkaProducer 時可經過 client.id 定義 clientId,若是未指定,則默認 producer- seq,seq 在進程內遞增,強烈建議客戶端顯示指定 clientId。

  • Metrics metrics
    度量的相關存儲容器,例如消息體大小、發送耗時等與監控相關的指標。

  • Partitioner partitioner
    分區負載均衡算法,經過參數 partitioner.class 指定。

  • int maxRequestSize
    調用 send 方法發送的最大請求大小,包括 key、消息體序列化後的消息總大小不能超過該值。經過參數 max.request.size 來設置。

  • long totalMemorySize
    生產者緩存所佔內存的總大小,經過參數 buffer.memory 設置。

  • Metadata metadata
    元數據信息,例如 topic 的路由信息,由 KafkaProducer 自動更新。

  • RecordAccumulator accumulator
    消息記錄累積器,將在消息發送部分詳細介紹。

  • Sender sender
    用於封裝消息發送的邏輯,即向 broker 發送消息的處理邏輯。

  • Thread ioThread
    用於消息發送的後臺線程,一個獨立的線程,內部使用 Sender 來向 broker 發送消息。

  • CompressionType compressionType
    壓縮類型,默認不啓用壓縮,可經過參數 compression.type 配置。可選值:none、gzip、snappy、lz四、zstd。

  • Sensor errors
    錯誤信息收集器,當成一個 metrics,用來作監控的。

  • Time time
    用於獲取系統時間或線程睡眠等。

  • Serializer< K> keySerializer
    用於對消息的 key 進行序列化。

  • Serializer< V> valueSerializer
    對消息體進行序列化。

  • ProducerConfig producerConfig
    生產者的配置信息。

  • long maxBlockTimeMs
    最大阻塞時間,當生產者使用的緩存已經達到規定值後,此時消息發送會阻塞,經過參數 max.block.ms 來設置最多等待多久。

  • ProducerInterceptors

    interceptors
    生產者端的攔截器,在消息發送以前進行一些定製化處理。
  • ApiVersions apiVersions
    維護 api 版本的相關元信息,該類只能在 kafka 內部使用。

  • TransactionManager transactionManager
    kafka 消息事務管理器。

  • TransactionalRequestResult initTransactionsResult
    kafka 生產者事務上下文環境初始結果。

通過上面的梳理,詳細讀者朋友對 KafkaProducer 消息生產者有了一個大概的認識,下一篇會重點介紹消息發送流程。接下來咱們以一個簡單的示例結束本文的學習。

三、KafkaProducer 簡單示例

package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9082,localhost:9072,");
        props.put("acks""all");
        props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        try {
            for (int i = 0; i < 100; i++) {
                Future<RecordMetadata>  future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
                RecordMetadata recordMetadata = future.get();
                System.out.printf("offset:" + recordMetadata.offset());
            }
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

本文就介紹到這裏,其主要的目的是瞭解Kafka 的 Producer,引出後續須要學習的內容,下一篇將重點講述 Kafka 消息的發送流程,敬請關注。

若是本文對你們有所幫助的話,麻煩幫忙點個【在看】,謝謝。

https://mp.weixin.qq.com/s/rUJSctU8qdGL-7ri_9Kgkw

相關文章
相關標籤/搜索