Kafka 系列(三)—— Kafka 生產者詳解

1、生產者發送消息的過程

首先介紹一下 Kafka 生產者發送消息的過程:java

  • Kafka 會將發送消息包裝爲 ProducerRecord 對象, ProducerRecord 對象包含了目標主題和要發送的內容,同時還能夠指定鍵和分區。在發送 ProducerRecord 對象前,生產者會先把鍵和值對象序列化成字節數組,這樣它們纔可以在網絡上傳輸。
  • 接下來,數據被傳給分區器。若是以前已經在 ProducerRecord 對象裏指定了分區,那麼分區器就不會再作任何事情。若是沒有指定分區 ,那麼分區器會根據 ProducerRecord 對象的鍵來選擇一個分區,緊接着,這條記錄被添加到一個記錄批次裏,這個批次裏的全部消息會被髮送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。
  • 服務器在收到這些消息時會返回一個響應。若是消息成功寫入 Kafka,就返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量。若是寫入失敗,則會返回一個錯誤。生產者在收到錯誤以後會嘗試從新發送消息,若是達到指定的重試次數後尚未成功,則直接拋出異常,再也不重試。

2、建立生產者

2.1 項目依賴

本項目採用 Maven 構建,想要調用 Kafka 生產者 API,須要導入 kafka-clients 依賴,以下:git

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

2.2 建立生產者

建立 Kafka 生產者時,如下三個屬性是必須指定的:github

  • bootstrap.servers :指定 broker 的地址清單,清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找 broker 的信息。不過建議至少要提供兩個 broker 的信息做爲容錯;
  • key.serializer :指定鍵的序列化器;
  • value.serializer :指定值的序列化器。

建立的示例代碼以下:算法

public class SimpleProducer {

    public static void main(String[] args) {

        String topicName = "Hello-Kafka";

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop001:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*建立生產者*/
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, 
                                                                         "world" + i);
            /* 發送消息*/
            producer.send(record);
        }
        /*關閉生產者*/
        producer.close();
    }
}

本篇文章的全部示例代碼能夠從 Github 上進行下載:kafka-basisshell

2.3 測試

1. 啓動Kakfa

Kafka 的運行依賴於 zookeeper,須要預先啓動,能夠啓動 Kafka 內置的 zookeeper,也能夠啓動本身安裝的:apache

# zookeeper啓動命令
bin/zkServer.sh start

# 內置zookeeper啓動命令
bin/zookeeper-server-start.sh config/zookeeper.properties

啓動單節點 kafka 用於測試:bootstrap

# bin/kafka-server-start.sh config/server.properties

2. 建立topic

# 建立用於測試主題
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 1 \
                     --topic Hello-Kafka

# 查看全部主題
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 啓動消費者

啓動一個控制檯消費者用於觀察寫入狀況,啓動命令以下:api

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning

4. 運行項目

此時能夠看到消費者控制檯,輸出以下,這裏 kafka-console-consumer 只會打印出值信息,不會打印出鍵信息。數組

2.4 可能出現的問題

在這裏可能出現的一個問題是:生產者程序在啓動後,一直處於等待狀態。這一般出如今你使用默認配置啓動 Kafka 的狀況下,此時須要對 server.properties 文件中的 listeners 配置進行更改:服務器

# hadoop001 爲我啓動kafka服務的主機名,你能夠換成本身的主機名或者ip地址
listeners=PLAINTEXT://hadoop001:9092

2、發送消息

上面的示例程序調用了 send 方法發送消息後沒有作任何操做,在這種狀況下,咱們沒有辦法知道消息發送的結果。想要知道消息發送的結果,可使用同步發送或者異步發送來實現。

2.1 同步發送

在調用 send 方法後能夠接着調用 get() 方法,send 方法的返回值是一個 Future<RecordMetadata>對象,RecordMetadata 裏面包含了發送消息的主題、分區、偏移量等信息。改寫後的代碼以下:

for (int i = 0; i < 10; i++) {
    try {
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
        /*同步發送消息*/
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("topic=%s, partition=%d, offset=%s \n",
                metadata.topic(), metadata.partition(), metadata.offset());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

此時獲得的輸出以下:偏移量和調用次數有關,全部記錄都分配到了 0 分區,這是由於在建立 Hello-Kafka 主題時候,使用 --partitions 指定其分區數爲 1,即只有一個分區。

topic=Hello-Kafka, partition=0, offset=40 
topic=Hello-Kafka, partition=0, offset=41 
topic=Hello-Kafka, partition=0, offset=42 
topic=Hello-Kafka, partition=0, offset=43 
topic=Hello-Kafka, partition=0, offset=44 
topic=Hello-Kafka, partition=0, offset=45 
topic=Hello-Kafka, partition=0, offset=46 
topic=Hello-Kafka, partition=0, offset=47 
topic=Hello-Kafka, partition=0, offset=48 
topic=Hello-Kafka, partition=0, offset=49

2.2 異步發送

一般咱們並不關心發送成功的狀況,更多關注的是失敗的狀況,所以 Kafka 提供了異步發送和回調函數。 代碼以下:

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
    /*異步發送消息,並監聽回調*/
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                System.out.println("進行異常處理");
            } else {
                System.out.printf("topic=%s, partition=%d, offset=%s \n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        }
    });
}

3、自定義分區器

Kafka 有着默認的分區機制:

  • 若是鍵值爲 null, 則使用輪詢 (Round Robin) 算法將消息均衡地分佈到各個分區上;
  • 若是鍵值不爲 null,那麼 Kafka 會使用內置的散列算法對鍵進行散列,而後分佈到各個分區上。

某些狀況下,你可能有着本身的分區需求,這時候能夠採用自定義分區器實現。這裏給出一個自定義分區器的示例:

3.1 自定義分區器

/**
 * 自定義分區器
 */
public class CustomPartitioner implements Partitioner {

    private int passLine;

    @Override
    public void configure(Map<String, ?> configs) {
        /*從生產者配置中獲取分數線*/
        passLine = (Integer) configs.get("pass.line");
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                         byte[] valueBytes, Cluster cluster) {
        /*key 值爲分數,當分數大於分數線時候,分配到 1 分區,不然分配到 0 分區*/
        return (Integer) key >= passLine ? 1 : 0;
    }

    @Override
    public void close() {
        System.out.println("分區器關閉");
    }
}

須要在建立生產者時指定分區器,和分區器所須要的配置參數:

public class ProducerWithPartitioner {

    public static void main(String[] args) {

        String topicName = "Kafka-Partitioner-Test";

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop001:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        /*傳遞自定義分區器*/
        props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");
        /*傳遞分區器所需的參數*/
        props.put("pass.line", 6);

        Producer<Integer, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i <= 10; i++) {
            String score = "score:" + i;
            ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
            /*異步發送消息*/
            producer.send(record, (metadata, exception) ->
                    System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
        }

        producer.close();
    }
}

3.2 測試

須要建立一個至少有兩個分區的主題:

bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 2 \
                     --topic Kafka-Partitioner-Test

此時輸入以下,能夠看到分數大於等於 6 分的都被分到 1 分區,而小於 6 分的都被分到了 0 分區。

score:6, partition=1, 
score:7, partition=1, 
score:8, partition=1, 
score:9, partition=1, 
score:10, partition=1, 
score:0, partition=0, 
score:1, partition=0, 
score:2, partition=0, 
score:3, partition=0, 
score:4, partition=0, 
score:5, partition=0, 
分區器關閉

4、生產者其餘屬性

上面生產者的建立都僅指定了服務地址,鍵序列化器、值序列化器,實際上 Kafka 的生產者還有不少可配置屬性,以下:

1. acks

acks 參數指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的:

  • acks=0 : 消息發送出去就認爲已經成功了,不會等待任何來自服務器的響應;
  • acks=1 : 只要集羣的首領節點收到消息,生產者就會收到一個來自服務器成功響應;
  • acks=all :只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。

2. buffer.memory

設置生產者內存緩衝區的大小。

3. compression.type

默認狀況下,發送的消息不會被壓縮。若是想要進行壓縮,能夠配置此參數,可選值有 snappy,gzip,lz4。

4. retries

發生錯誤後,消息重發的次數。若是達到設定值,生產者就會放棄重試並返回錯誤。

5. batch.size

當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算。

6. linger.ms

該參數制定了生產者在發送批次以前等待更多消息加入批次的時間。

7. clent.id

客戶端 id,服務器用來識別消息的來源。

8. max.in.flight.requests.per.connection

指定了生產者在收到服務器響應以前能夠發送多少個消息。它的值越高,就會佔用越多的內存,不過也會提高吞吐量,把它設置爲 1 能夠保證消息是按照發送的順序寫入服務器,即便發生了重試。

9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

  • timeout.ms 指定了 borker 等待同步副本返回消息的確認時間;
  • request.timeout.ms 指定了生產者在發送數據時等待服務器返回響應的時間;
  • metadata.fetch.timeout.ms 指定了生產者在獲取元數據(好比分區首領是誰)時等待服務器返回響應的時間。

10. max.block.ms

指定了在調用 send() 方法或使用 partitionsFor() 方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

11. max.request.size

該參數用於控制生產者發送的請求大小。它能夠指發送的單個消息的最大值,也能夠指單個請求裏全部消息總的大小。例如,假設這個值爲 1000K ,那麼能夠發送的單個最大消息爲 1000K ,或者生產者能夠在單個請求裏發送一個批次,該批次包含了 1000 個消息,每一個消息大小爲 1K。

12. receive.buffer.bytes & send.buffer.byte

這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 表明使用操做系統的默認值。

參考資料

  1. Neha Narkhede, Gwen Shapira ,Todd Palino(著) , 薛命燈 (譯) . Kafka 權威指南 . 人民郵電出版社 . 2017-12-26

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索