Kafka Producer發送消息

正常的發送邏輯

  1. 配置生產者客戶端參數及建立相對應的生產者實例。
  2. 構建待發送消息。
  3. 發送消息。
  4. 關閉生產者實例。

生產者代碼以下:java

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.1</version>
</dependency>
複製代碼
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerStart {

    private static final String topic = "topic-demo";
    private static final String brokeList = "localhost:9092";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokeList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
// props.put("acks", "all");
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("client.id", "producer.client.id.demo");
// props.put("bootstrap.server", brokeList);
        return props;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello,Kafka!");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}
複製代碼

先來看看KafkaProducer類的註釋,通常來講,開源項目源碼都是言簡意賅apache

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources. The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency. The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. 複製代碼

我來翻譯一下: 第一句意思是生成者由一個緩衝空間池組成,其中保存了還沒有傳輸到服務器的消息 以及有個後臺I/O線程負責將這些消息轉換爲請求並將其傳輸到集羣的。這說明Producer類並非真正的發送類,而是將消息存儲起來,等待另外一個線程將消息發送到服務端。bootstrap

第二句,send()方法是異步的,當被調用時,它將消息添加到掛起消息發送的緩衝區中並當即返回。這容許生產者批量處理單個記錄以提升效率。這裏也說明Producer不是真正的發送者,而是將消息存在緩衝區中就返回。bash

第三局,acks配置控制了被認爲是完整請求的條件。示例代碼中指定acks=all,這將致使阻塞記錄的完整提交,這是最慢但最持久的設置。查看org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG發現,下面還有個ACKS的解釋ACKS_DOC。 這個參數用來指定分區中必須有多少個副本收到這條消息,以後生產者纔會認爲這條消息是成功寫入的。acks有3種類型的值(字符串類型)服務器

  • acks = 0 。那麼生產者將根本不會等待任何來自服務器的確認。該記錄將當即被添加到套接字緩衝區並被認爲已發送。在這種狀況下,不能保證服務器已經收到了記錄,重試配置不會生效(由於客戶端一般不會知道任何失敗)。返回給每條記錄的偏移量將會老是設置爲-1。
  • acks = 1。這將意味着leader將記錄寫到它的本地日誌中,就會響應成功,不須要等待其餘副本的確認。在這種狀況下,若是leader在確認記錄以後失敗了,可是在follow副本複製以前,記錄就會丟失。
  • acks = -1 或 acks = all。這意味着leader將等待完整的同步副本集合(in-sync replicas)認可記錄。這保證了只要有至少一個同步副本,記錄就不會丟失.acks=all能夠達到最強的可靠性

消息的發送

發送消息主要有三種模式:發後即忘(fire-and-forget),同步(sync),異步(async)。示例代碼中用的就是發後即忘,只管往Kafka中發送消息而不關係消息是否正確到達。在大多數狀況下,這種發送方式沒有什麼問題,不過在某些時候會形成消息的丟失。該方式性能最高,可靠性最差。 KakfaProducer的send()方法是 Future 類型。send有兩個重載方法,具體定義以下:異步

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
    }
 public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) 複製代碼

同步,能夠利用返回的Future對象實現,示例以下:

try {
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}
複製代碼

實際上send方法自己就是異步的。調用get()方法來阻塞等待kafka的響應,直到消息發送成功或者發生異常。若是發生異常,那麼須要捕獲並交由外層邏輯處理。 KakfaProducer中通常會出現兩種類型異常:一種是可重試異常與不可重試異常。 對於可重試異常,若是配置了retries參數,那麼只要在規定的重試次數內自行恢復,就不會拋出異常。retries參數默認爲0,配置方式以下:async

props.put(ProducerConfig.RETRIES_CONFIG,3);
//這表示配置了3次重試,若是重試了3次以後尚未恢復,那麼仍會拋出異常,進外層邏輯就要處理這些異常了
複製代碼

同步發送的方式性能很低,須要阻塞等待一條消息發送完之後才能發出下一條。ide

異步發送的方式

通常是send()方法裏面指定一個Callback的回調函數,Kafka在返回響應時調用該函數來實現異步的發送確認。 使用Callback的方式很是簡單明瞭,Kafka有響應就會回調,要麼發送成功要麼拋出異常,發送方式以下:函數

producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        //異常不爲空,說明發送失敗
                        //實際項目中能夠根據具體業務進行相應的處理
                        exception.printStackTrace();
                    } else {
                        //說明消息發送成功
                        System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
                    }
                }
            });
複製代碼

onCompletion()方法裏面的兩個參數是互斥的。之因此是互斥的,是由於這個。這個下次再講post

org.apache.kafka.clients.producer.internals.ProducerBatch#completeFutureAndFireCallbacks
    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);

        // execute callbacks
        for (Thunk thunk : thunks) {
            try {
                // 判斷異常是否爲空
                if (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(metadata, null);
                } else {
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }

        produceFuture.done();
    }

複製代碼

消息發送成功時,metadata不爲null而exception爲null;反之metadata爲null而exception不爲null。

下一篇文章Kafka發送者源碼解析

若是有地方有疑惑或者寫的有很差,能夠評論或者經過郵箱聯繫我creazycoder@sina.com

相關參考: 《深刻理解Kafka核心設計與實踐原理》

相關文章
相關標籤/搜索