KafkaProducer 簡析

使用方式

KafkaProducer 發送消息主要有如下 3 種方式:html

Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
    
    // 發送並忘記(fire-and-forget)
    producer.send(record);

    // 同步發送
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get();

    // 異步發送
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            
        }
    });

    producer.close();

具體的發送流程能夠參考 KafkaProducer發送流程簡析java

KafkaProducer 是線程安全的,多個線程能夠共享同一個 KafkaProducer 對象。算法

配置解析

client.id

 該參數能夠是任意的字符串,broker 會用它來識別消息的來源,會在日誌和監控指標裏展現。apache

bootstrap.servers

 該屬性指定 broker 的地址列表。
 清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找到其餘 broker 的信息。
 不過建議至少要提供兩個 broker 的信息,一旦其中一個宕機,生產者仍然可以鏈接到集羣上。bootstrap

key.serializer & value.serializer

 這兩個屬性必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer接口的類。
 生產者會使用這個類把鍵值對象序列化成字節數組。數組

receive.buffer.bytes & send.buffer.bytes

 設置 socket 讀寫數據時用到的 TCP 緩衝區大小。若是它們被設爲 -1,就使用操做系統的默認值。
 當生產者或消費者與 broker 處於不一樣的機房時,能夠適當增大這些值安全

buffer.memory

 設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。
 若是應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足。
 此時KafkaProducer.send()會阻塞等待內存釋放,等待時間超過 max.block.ms 後會拋出超時異常。服務器

compression.type

 該參數指定了消息被髮送給 broker 以前,使用哪種壓縮算法(snappygziplz4)進行壓縮。
 使用壓縮能夠下降網絡傳輸開銷和存儲開銷,而這每每是向 Kafka 發送消息的瓶頸所在。網絡

batch.size

 該參數指定了一個批次可使用的內存字節數(而不是消息個數)。
 消息批次ProducerBatch包含了一組將要發送至同個分區的消息,當批次被填滿,批次裏的全部消息會被當即發送出去。app

 不過生產者並不必定都會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也可能被髮送。
 因此就算把批次大小設置得很大,也不會形成延遲,只是會佔用更多的內存而已。
 但若是設置得過小,生產者會頻繁地發送消息,會增長一些額外的網絡開銷。

linger.ms

 該參數指定了生產者在發送批次以前等待的時間。
 生產者會在批次填滿或等待時間達到 linger.ms 時把批次發送出去。
 設置linger.ms>0會增長延遲,但也會提高吞吐量(一次性發送更多的消息,每一個消息的開銷就變小了)。

acks

 參數指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。
 這個參數決定令消息丟失的可能性:

  • acks=0 生產者發出消息後不等待來自服務器的響應
    若是當中出現了問題,致使服務器沒有收到消息,那麼生產者就無從得知,消息也就丟失了。
    不過,由於生產者不須要等待服務器的響應,因此它能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量。

  • acks=1 只要集羣的 leader 節點收到消息,生產者就會收到一個來自服務器的成功響應
    若是消息沒法到達 leader 節點(好比:leader節點崩潰,新的 leader 尚未被選舉出來),生產者會收到一個錯誤響應。
    爲了不數據丟失,生產者會重發消息。不過,若是一個沒有收到消息的節點成爲新 leader,消息仍是會丟失。

    這個時候的吞吐量取決於使用的是同步發送仍是異步發送:

    • 發送端阻塞等待服務器的響應(經過調用 Future.get() 方法),顯然會增長延遲(在網絡上傳輸一個來回的延遲)
    • 發送端使用回調能夠緩解延遲問題,不過吞吐量仍受在途消息數量的限制(好比:生產者在收到服務器響應以前能夠發送多少個消息)
  • acks=all 只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應
    這種模式是最安全的,就算有服務器發生崩潰,數據也不會丟失。
    不過,它的延遲比 acks=1 時更高,由於咱們要等待不僅一個服務器節點接收消息。

retries

 該參數決定了生產者能夠重發消息的次數(每次重試之間等待 retry.backoff.ms)。
 服務器返回臨時性的錯誤(好比:分區找不到 leader)時,生產者會自動重試,不必在代碼邏輯裏處理可重試的錯誤。
 做爲開發者,只須要處理那些不可重試的錯誤(好比:消息字節數超過單個發送批次上限)或重試次數超出上限的狀況便可。

max.in.flight.requests.per.connection

 該參數指定生產者,最多能夠發送未響應在途消息批次數量。
 在途消息批次越多,會佔用更多的內存,不過也會提高吞吐量。

 當retries > 0max.in.flight.requests.per.connection > 1時,可能出現消息亂序。
 若是第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。
 若是此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。

 通常不建議設置retries=0,而是令max.in.flight.requests.per.connection = 1來保證消息順序。
 在生產者嘗試發送第一批消息時,就不會有其餘的消息發送給 broker,即便發生重試消息也不會亂序。
 不過這樣會嚴重影響生產者的吞吐量,因此只有在對消息的順序有嚴格要求的狀況下才能這麼作。

高級特性

冪等

當 broker 失效時生產者可能會自動重試,致使一條消息被重複寫入屢次。
爲了不這種狀況,Kafka 在生產者端提供來冪等保證:同一條消息被生產者發送屢次,但在 broker端這條消息只會被寫入日誌一次

在發送端設置 enable.idempotence = true 能夠開啓冪等性,此時配置同時知足如下條件:

  • max.in.flight.requests.per.connection ≤ 5
  • retries > 0
  • acks = all

其工做機制以下:

  • producer 在初始化時必須分配一個 PIDproducer id該過程對用戶來講是徹底透明的)
  • 發送到 broker 端的每批消息都會被賦予一個單調遞增的 SNsequence number用於消息去重(每一個分區都有獨立的序列號)
  • 接收到消息的 broker 會將批次的(PID, SN)信息一同持久化到對應的分區日誌中(保證 leader 切換後去重仍然生效)

若重試致使 broker 接收到小於或等於已知最大序列號的消息,broker 會拒絕寫入這些消息,從而保證每條消息也只會被保存在日誌中一次。
因爲每一個 producer 實例都會被分配不一樣的 PID,該機制只能保證單個 producer 實例的冪等性,沒法實現協同多個 producer 實現冪等。

事務

Kafka 事務能夠實現 producer 對多個主題和分區的原子寫入,而且保證 consumer 不會讀取到未提交的數據。

Kafka 要求應用程序必須提供一個全局惟一的 TIDtransactional id

初始化時,producer 首先要向 broker 集羣註冊其 TID,broker 會根據給定的 TID 檢查是否存在未完成的事務。

若是某個 producer 實例失效,該機制可以保證下一個擁有相同 TID 的實例首先完成以前未完成的事務。

此外,broker 還會爲自動每一個 producer 分配一個epoch用於隔離fencing out失效但仍存活的 producer:

當 producer 參與事務時,broker 會檢查是否存在相同的 TID 且 epoch 更大的活躍 producer。

若是存在,則認爲當前 producer 是一個殭屍實例zombie instance並拒絕爲其提供服務,防止其破壞事務的完整性。

下面是兩個常見的應用場景:

實現跨主題原子寫入

Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.setProperty("enable.idempotence", "true"); // 開啓冪等
    properties.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    ProducerRecord<String, String> record1 = new ProducerRecord<>("topic1", "key1", "value1");
    ProducerRecord<String, String> record2 = new ProducerRecord<>("topic2", "key2", "value2");
    ProducerRecord<String, String> record3 = new ProducerRecord<>("topic3", "key3", "value3");

    producer.initTransactions(); // 初始化事務(只需執行一次)
    try {
        producer.beginTransaction(); // 開始事務

        // 向多個不一樣的 topic 寫入消息
        producer.send(record1);
        producer.send(record2);
        producer.send(record3);

        producer.commitTransaction(); // 提交事務
    } catch (ProducerFencedException e) {
        producer.close(); // 事務ID 已被佔用
    } catch (KafkaException e) {
        producer.abortTransaction();
    }

實現 read-process-write 模式

final String groupID = "my-group-id";

    Properties producerProps = new Properties();
    producerProps.setProperty("bootstrap.servers", "localhost:9092");
    producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.setProperty("enable.idempotence", "true"); // 開啓冪等
    producerProps.setProperty("transactional.id", "my-transaction-id"); // 設置事務ID

    Properties consumerProps = new Properties();
    consumerProps.setProperty("bootstrap.servers", "localhost:9092");
    consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.setProperty("isolation.level","read_committed"); // 設置隔離級別
    consumerProps.setProperty("group.id", groupID); // 設置消費者組羣ID

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

    producer.initTransactions();
    consumer.subscribe(Collections.singletonList("ping"));

    while (true) {
        
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 讀取消息
        
        try {
            producer.beginTransaction(); // 開啓事務

            // 處理消息(能夠是任意業務場景)
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for(ConsumerRecord<String, String> record : records){
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())); // 記錄消費偏移量
                producer.send(new ProducerRecord<>("pong", record.value())); // 發送消息
            }

            producer.sendOffsetsToTransaction(offsets, groupID); // 提交消費偏移量
            producer.commitTransaction(); // 事務提交
        } catch (ProducerFencedException e) {
            producer.close(); // 事務ID 已被佔用
        } catch (Exception e){
            producer.abortTransaction(); // 回滾事務
        }
    }

參考資料

相關文章
相關標籤/搜索