Kafka核心API——Producer生產者

Producer異步發送演示

上文中介紹了AdminClient API的使用,如今咱們已經知道如何在應用中經過API去管理Kafka了。但在大多應用開發中,咱們最常面臨的場景就是發送消息到Kafka,或者從Kafka中消費消息,也就是典型的生產/消費模式。而本文將要演示的就是如何使用Producer API將消息發送至Kafka中,使應用成爲一個生產者。java

Producer API具備如下幾種發送模式:算法

  • 異步發送
  • 異步阻塞發送
  • 異步回調發送

接下來,使用一個簡單的例子演示一下異步向Kafka發送消息。首先,咱們須要建立一個Producer實例,而且必須配置三個參數,分別是Kafka服務的ip地址及端口號,以及消息key和value的序列化器(消息體以key-value結構形式存在)。apache

在本例中,消息的key和value均爲String類型,因此使用StringSerializer這個字符串類型的序列化器。代碼示例:安全

/**
 * 建立Producer實例
 */
public static Producer<String, String> createProducer() {
    Properties properties = new Properties();
    // 指定Kafka服務的ip地址及端口號
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    // 指定消息key的序列化器
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    // 指定消息value的序列化器
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    return new KafkaProducer<>(properties);
}

new KafkaProducer時,構造器裏作了什麼:bash

  • 讀取Properties裏的配置項,初始化ProducerConfig
  • 基於ProducerConfig初始化一些配置字段
  • 初始化MetricConfig監控度量指標配置以及MetricsReporter報告器列表和Metrics存儲庫
  • 從配置中加載partitioner負載均衡器,當有多個partition時就是經過這個負載均衡器去將消息均勻的分發到不一樣的partition中
  • 從配置中加載消息key和value的序列化器(Serializer)
  • 初始化RecordAccumulator,一個相似於計數器的東西,用於計算消息批次的。由於Producer並非接收到一條消息就發送到一條消息,而是達到必定批量後按批次發送的,因此須要有一個計數器來存儲和計算批次。
  • 初始化用於發送消息的Sender,而後會爲其建立一個守護線程,並啓動

Tips:app

  • 若是細看了KafkaProducer構造器的源碼,就會發現其全部的屬性都是final的,而且均在構造器中完成了初始化,不存在不安全的發佈或共享變量,這也就變相說明了KafkaProducer是線程安全的

而後調用Producer中的send方法便可實現異步發送。代碼示例:負載均衡

/**
 * 演示Producer異步發送
 */
public static void producerAsyncSend() {
    String topicName = "MyTopic";
    String key = "test-key";
    String value = "this is test message!";

    try (Producer<String, String> producer = createProducer()) {
        // 構建消息對象
        ProducerRecord<String, String> record =
                new ProducerRecord<>(topicName, key, value);
        // 發送一條消息
        producer.send(record);
    }
}

producer.send(record)裏主要作了如下事情:異步

  • 使用序列化器去序列化消息的key和value
  • 計算分區,即計算消息具體進入哪個partition,也就是一個負載均衡的過程
  • 計算批次,判斷是否須要建立新的批次,而後都須要調用accumulator.append向批次中追加消息
  • 當批次滿了,調用sender.wakeup在守護線程中去發送消息

大體時序圖以下:
Kafka核心API——Producer生產者ide

發送消息的具體流程圖以下:
Kafka核心API——Producer生產者函數


Producer異步阻塞發送演示

send方法會有一個Future類型的返回值,當咱們調用Futureget方法時,就會阻塞當前線程,此時就達到了異步阻塞發送消息的效果,即發送消息是異步的,獲取結果是阻塞的。咱們能夠經過這種方式去獲取Future裏存儲的元數據信息。代碼示例:

/**
 * 演示Producer異步阻塞式發送
 */
public static void producerAsyncBlockSend() throws Exception {
    String topicName = "MyTopic";
    String key = "test-key";
    String value = "this is test message!";

    try (Producer<String, String> producer = createProducer()) {
        // 構建消息對象
        ProducerRecord<String, String> record =
                new ProducerRecord<>(topicName, key, value);
        // 發送一條消息
        Future<RecordMetadata> future = producer.send(record);
        // 調用get時會阻塞當前線程,就能實現異步阻塞式地發送
        // 其實發送完就立刻get已經同等於同步的效果了
        RecordMetadata metadata = future.get();
        System.out.println(String.format(
                "hasTimestamp: %s, timestamp: %s, hasOffset: %s, offset: %s, partition: %s, topic: %s",
                metadata.hasTimestamp(), metadata.timestamp(),
                metadata.hasOffset(), metadata.offset(),
                metadata.partition(), metadata.topic()
        ));
    }
}

運行以上代碼,控制檯輸出內容以下:

hasTimestamp: true, timestamp: 1589637627231, hasOffset: true, offset: 5, partition: 1, topic: MyTopic

Producer異步回調發送演示

若是想要在發送完消息後獲取結果,比起直接調用Futureget方法更好的方式是使用異步回調的消息發送形式。

send方法中支持傳入一個回調函數,當消息發送完畢後,會調用回調函數並將結果看成參數傳入,此時咱們就能夠在回調函數中對結果進行處理。代碼示例:

/**
 * 演示Producer異步回調發送
 */
public static void producerAsyncCallbackSend() throws Exception {
    String topicName = "MyTopic";
    String key = "test-key";
    String value = "this is test message!";

    try (Producer<String, String> producer = createProducer()) {
        // 構建消息對象
        ProducerRecord<String, String> record =
                new ProducerRecord<>(topicName, key, value);
        // 發送一條消息,傳入一個回調函數,當消息發送完成後會調用傳入的回調函數
        producer.send(record, (metadata, err) -> {
            if (err != null) {
                err.printStackTrace();
            }

            System.out.println(String.format(
                    "hasTimestamp: %s, timestamp: %s, hasOffset: %s, offset: %s, partition: %s, topic: %s",
                    metadata.hasTimestamp(), metadata.timestamp(),
                    metadata.hasOffset(), metadata.offset(),
                    metadata.partition(), metadata.topic()
            ));
        });
    }
}

運行以上代碼,控制檯輸出內容以下:

hasTimestamp: true, timestamp: 1589639553024, hasOffset: true, offset: 7, partition: 1, topic: MyTopic

自定義Partition負載均衡器

在某些特殊的業務場景下咱們常常會有自定義負載均衡算法的需求,在Kafka中能夠經過實現Partitioner接口來自定義Partition負載均衡器。

本例中所實現的負載均衡算法比較簡單,就是使用keyhashcode去對partition的數量進行取餘得出partition的索引,代碼示例:

package com.zj.study.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 自定義Partition負載均衡器
 *
 * @author 01
 * @date 2020-05-17
 **/
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key,
                         byte[] keyBytes, Object value,
                         byte[] valueBytes, Cluster cluster) {

        int partitionsNum = cluster.partitionsForTopic(topic).size();
        return key.hashCode() % partitionsNum;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

而後在建立Producer實例時,指定MyPartitioner的包名路徑便可。代碼示例:

/**
 * 建立Producer實例
 */
public static Producer<String, String> createProducer() {
    Properties properties = new Properties();
    ...
    // 指定自定義的Partition負載均衡器
    properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG
            , "com.zj.study.kafka.producer.MyPartitioner");

    return new KafkaProducer<>(properties);
}

Kafka的消息傳遞保障

咱們首先要了解一下消息的傳遞語義,通常存在三種類型語義:

  • At most once(最多一次):消息傳遞過程當中有可能丟失,丟失的消息也不會從新傳遞,其實就是保證消息不會重複發送或者重複消費
  • At least once(至少一次):消息在傳遞的過程當中不可能會丟失,丟失的消息會從新傳遞,其實就是保證消息不會丟失,可是消息有可能重複發送或者重複被消費
  • Exactly once(正好一次):這個是大多數場景須要的語義,其實就是保證消息不會丟失,也不會重複被消費,消息只傳遞一次

在Kafka中主要經過消息重發和ACK機制來保障消息的傳遞,消息重發機制主要是提升消息發送的成功率,並不能保證消息必定能發送成功。咱們能夠經過在建立Producer實例時,設置retries配置項來開啓或關閉消息重發機制,代碼示例:

// 設置的值爲0表示關閉,大於0則表示開啓
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");

另外一個消息傳遞保障機制就是ACK機制,Kafka中的ACK機制有三種模式,須要經過配置去指定。這三種配置的含義以下:

  • acks=0:
    • Producer發送消息到發送端的buffer中就直接返回了,至於這個消息有沒有真的發送到Broker Server,Producer不關心,即便消息發送失敗,上面說的消息重發機制也不起做用,因此在這種場景下,可能就會丟失消息了(這就有點相似於UDP,只管發,無論對方有沒有接收到消息)
  • acks=1:
    • Producer發送的消息必定要存儲到對應的分區的Leader副本日誌文件中才算消息發送成功,要是失敗的話,則會嘗試retry。在這種模式下,只有當消息已經存儲在Leader副本中,可是消息尚未被Follower副本同步的時候,若是Leader副本所在的broker server掛了,消息纔會丟失
  • acks=all:
    • Producer發送的消息必定要存儲到對應的分區的全部的在ISR列表中的副本日誌文件中才算消息發送成功,要是失敗的話,則會嘗試retry。這種場景下消息就很難丟失了,除非全部的副本所在的Broker Server都掛了

一樣的該配置項能夠在建立Producer實例時進行設置,代碼示例:

properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");

上面的三種取值能夠根據實際的業務場景來進行設置,消息的可靠性越強的,性能確定就會越差。這三種取值就是在消息的可靠性以及性能兩個方面作一個權衡:

  • 性能要求高,但可靠性要求低的,能夠選擇acks=0
  • 性能和可靠性都但願可以兼顧的,就選擇acks=1
  • 若容許犧牲性能來保證高可靠的場景,則選擇acks=all
相關文章
相關標籤/搜索