Kafka學習筆記(6)----Kafka使用Producer發送消息

1. Kafka的Producer

  不論將kafka做爲何樣的用途,都少不了的向Broker發送數據或接受數據,Producer就是用於向Kafka發送數據。以下:java

  

2. 添加依賴

  pom.xml文件以下:node

 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>2.1.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.0</version>
    </dependency>

3. 發送消息

  3.1 建立生產者

  建立生產者的時候,咱們須要爲生產者設置一些屬性,其中有三個必選屬性以下:算法

  1. bootstrap.servers: 該屬性指定broker 的地址清單,地址的格式爲host:po 忱。清單裏不須要包含全部的broker 地址,生產者會給定的broker 裏查找到其餘broker 的信息。不過建議至少要提供兩個broker 的信息, 一且其中一個若機,生產者仍然可以鏈接到集羣上。apache

  2. key.serializer: broker 但願接收到的消息的鍵和值都是字節數組。生產者接口容許使用參數化類型,所以能夠把Java 對象做爲鍵和值發送給broker 。這樣的代碼具備良好的可讀性,不過生產者須要知道如何把這些Java 對象轉換成字節數組。key. serializer必須被設置爲一實現了org.apache.kafka.common.serialization.StringSerializer接口的類,生產者會使用這個類把鍵對象序列化成字節數組。Kafka 客戶端默認提供了ByteArraySerializer(這個只作不多的事情)、StringSerializer和IntegeSerializer,所以,若是你只使用常見的幾種Java 對象類型,那麼就不必實現本身的序列化器。要注意,  key.serializer是必須設置的,就算你打算只發送值內容。bootstrap

  3. value.serializer: 與key.serializer同樣,value.serializer指定的類會將值序列化。若是鍵和值都是字符串,可使用與key.serializer同樣的序列化器。若是鍵是整數類型而值是字符串,那麼須要使用不一樣的序列化器。數組

  設置屬性代碼以下:安全

  Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value容許使用參數化類型
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

  3.2 發送消息的三種方式

  1. 併發並忘記,這是普通的消息發送方式,咱們把消息發送給服務器,但井不關心它是否正常到達。大多數狀況下,消息會正常到達,由於Kafka 是高可用的,並且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些消息。服務器

  實現以下:網絡

package com.wangx.kafka.client;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value容許使用參數化類型
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

        kafkaProducer.send(record);

    }
}

  此時在Kafka中打開內置的消費者消費消息,結果以下,命令以下:併發

  kafka-console-consumer.sh --bootstrap-server 47.105.145.123:9092 --topic testTopic --from-beginning

  而後,啓動生產者發送消息,結果以下:

  這裏啓動了四次消費者,因此有四條消息被消費。

  3.3 同步發送消息

  咱們使用send () 方怯發送消息, 它會返回Future對象,調用get () 方法進行等待,就能夠知道悄息是否發送成功。

  實現方式以下:

package com.wangx.kafka.client;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value容許使用參數化類型
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);
        //建立消息對象,第一個爲參數topic,第二個參數爲key,第三個參數爲value
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

        //同步發送方式,get方法返回結果
        RecordMetadata metadata = (RecordMetadata) kafkaProducer.send(record).get();
        System.out.println("broker返回消息發送信息" + metadata);

    }
}

  客戶端消費者仍能收到消息,且生產者也能收到返回結果,返回結果以下:

  

  3.4 異步發送消息

  假設消息在應用程序和Kafka 集羣之間一個來回須要lOm s 。若是在發送完每一個消息後都等待迴應,那麼發送100 個消息須要l秒。但若是隻發送消息而不等待響應,那麼發送100 個消息所須要的時間會少不少。大多數時候,咱們並不須要等待響應一一儘管Kafka會把目標主題、分區信息和悄息的偏移量發送回來,但對於發送端的應用程序來講不是必需的。不過在遇到消息發送失敗時,咱們須要拋出異常、記錄錯誤日誌,或者把消息寫入「錯誤消息」文件以便往後分析。

  實現以下:

package com.wangx.kafka.client;


import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) {
        Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
        //配置key-value容許使用參數化類型
        kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);
        //建立消息對象,第一個爲參數topic,第二個參數爲key,第三個參數爲value
        final ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

        //異步發送消息。異常時打印異常信息或發送結果
        kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    System.out.println(e.getMessage());
                } else {
                    System.out.println("接收到返回結果:" + recordMetadata);
                }
            }
        });
        //異步發送消息時必需要flush,不然發送不成功,不會執行回調函數
        kafkaProducer.flush();
    }
}

  監聽到的返回信息以下:

  3.5 生產者的配置

  生產者還有不少能夠配置的參數,以下:

  1. acks:指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。這個參數對消息丟失的可能性有重要影響。該參數有以下選項。

  若是acks=0 , 生產者在成功寫入消息以前不會等待任何來自服務器的響應。也就是說,若是當中出現了問題, 致使服務器沒有收到消息,那麼生產者就無從得知,消息也就丟失了。不過,由於生產者不須要等待服務器的響應,因此它能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量。

  若是acks=1 ,只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功響應。若是消息沒法到達首領節點(好比領導節點奔潰,新的首領尚未被選舉出來),生產者會收到一個錯誤響應,爲了不數據丟失,生產者會重發消息。不過,若是一個沒有收到消息的節點成爲新首領,消息仍是會丟失。這個時候的吞吐量取決於使用的是同步發送仍是異步發送。若是讓發送客戶端等待服務器的響應(經過調用Future對象的ge t ()方法),顯然會增長延遲(在網絡上傳輸一個來回的延遲)。若是客戶端使用回調,延遲問題就能夠獲得緩解,不過吞吐量仍是會受發送中消息數量的限制(好比,生產者在收到服務器響應以前能夠發送多少個消息)。

  若是acks=all ,只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。這種模式是最安全的,它能夠保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集羣仍然能夠運行。不過,它的延遲比acks=1時更高,由於咱們要等待不僅一個服務器節點接收消息。

  2. buffer.memory: 用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。若是應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足。這個時候,send ()方法調用要麼被阻塞,要麼拋出異常,取決於如何設置block.on.buffer 參數(在0. 9.0.0 版本里被替換成了l'la x .block.l'ls ,表示在拋出異常以前能夠阻塞一段時間)。

  3. compression.type: 默認狀況下,消息發送時不會被壓縮。該參數能夠設置爲snappy 、gzip 或lz4 ,它指定了消息被髮到broker 以前使用哪種壓縮算法進行壓縮。ssnappy壓縮算法由Google發明,它佔用較少的CPU ,卻能提供較好的性能和至關可觀的 壓縮比,若是比較關注性能和網絡帶寬,可使用這種算法。gzip壓縮算法通常會佔用較多的CPU ,但會提供更高的壓縮比,因此若是網絡帶寬比較有限,可使用這種算法。使用壓縮能夠下降網絡傳輸開銷和存儲開銷,而這每每是向Kafka 發送消息的瓶頸所在。

  4. retries: 生產者從服務器收到的錯誤有多是臨時性的錯誤,在這種狀況下, retries 參數的值決定了生產者能夠重發消息次數,若是達到這個次數,生產者會放棄重試並返回錯誤

  5. batch.size: 當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次裏,該參數指定了一個批次可使用的內存大小,按照字節數計算。

  6. linger.ms: 該參數指定了生產者在發送批次以前等待更多消息加入批次的時間。

  7. client.id: 該參數能夠是任意的字符串,服務器會用它來識別消息的來源,還能夠用在日誌和配額指標裏

  8. max.in.flight.requests.per.connection: 該參數指定了生產者在收到服務器晌應以前能夠發送多少個消息

  9. timeout.ms 、request.timeout.ms 和metadata.fetch.timeout.ms:request.timeout.ms 指定了生產者在發送數據時等待服務器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元數據(好比目標分區的首領是誰)時等待服務器返回響應的時間。若是等待響應超時,那麼生產者要麼重試發送數據,要麼返回一個錯誤(拋出異常或執行回調)。timeout.ms指定了broker 等待同步副本返回消息確認的時間,與asks 的配置相匹配一一若是在指定時間內沒有收到同步副本的確認,那麼broker 就會返回一個錯誤。

  10. max.block.ms:該參數指定了在調用send () 方法或使用partitionsFor()方法獲取元數據時生產者的阻塞時間。

  11. max.request.size:該參數用於控制生產者發送的請求大小,能夠指能發送的單個消息的最大值,也能夠指單個請求裏面全部消息總的大小。

  12. receive.buffer.bytes 和send.buffer.bytes: 這兩個參數分別指定了TCP socket 接收和發送數據包的緩衝區大小,若是它們被設爲-1,就使用操做系統的默認值。

相關文章
相關標籤/搜索