[Kafka][3][Kafka生產者——向Kafka寫入數據]

第3章 Kafka生產者——向Kafka寫入數據

對應代碼倉庫地址:https://github.com/T0UGH/getting-started-kafkajava

參考資料:git

kafka權威指南https://book.douban.com/subject/27665114/github

大數據通用的序列化器——Apache Avrohttps://www.jianshu.com/p/0a85bfbb9f5f算法

Kafka 中使用 Avro 序列化框架(一)https://www.jianshu.com/p/4f724c7c497dshell

Kafka 中使用 Avro 序列化組件(三):Confluent Schema Registryhttps://www.jianshu.com/p/cd6f413d35b0apache

本章對應代碼倉庫可查看https://github.com/T0UGH/getting-started-kafka/tree/main/src/main/java/cn/edu/neu/demo/ch3編程

在這一章,咱們將從Kafka生產者設計組件講起,學習如何使用Kafka 生產者。json

  1. 咱們將演示如何建立KafkaProducer和ProducerRecords對象、
  2. 如何將記錄發送給Kafka
  3. 如何處理從Kafka返回的錯誤
  4. 介紹用於控制生產者行爲的重要配置選項
  5. 深刻探討如何使用不一樣的分區方法和序列化器,以及如何自定義序列化器和分區器。

3.1 生產者概覽

先展現向Kafka發送消息的主要步驟bootstrap

  1. 首先建立一個ProducerRecord對象開始,ProducerRecord 對象須要包含目標主題和要發送的內容,有可能還包含分區信息數組

  2. 把ProducerRecord中的鍵和值序列化成字節數組,這樣它們纔可以在網絡上傳輸

  3. 接下來,數據被傳給分區器

    • 若是以前在ProducerRecord對象裏指定了分區,那麼分區器就不會再作任何事情,直接把指定的分區返回
    • 若是沒有指定分區,那麼分區器會根據ProducerRecord對象的來選擇一個分區
  4. 緊接着,這條記錄被添加到一個記錄批次裏,這個批次裏的全部消息會被髮送到相同的主題和分區上。

  5. 有一個獨立的線程負責把這些記錄批次發送到相應的broker 上。

  6. 服務器在收到這些消息時會返回一個響應

    • 若是消息成功寫入Kafka,就返回一個RecordMetaData對象,它包含了主題分區信息,以及記錄在分區裏的偏移量

    • 若是寫入失敗,則會返回一個錯誤。生產者在收到錯誤以後會嘗試從新發送消息,幾回以後若是仍是失敗,就返回錯誤信息。

3.2 建立Kafka生產者

下面展現如何建立一個KafkaProducer

Properties kafkaProperties = new Properties();

// fixme: 運行時請修改47.94.139.116:9092爲本身的kafka broker地址
kafkaProperties.put("bootstrap.servers", "47.94.139.116:9092");

kafkaProperties.put("key.serializer",                     "org.apache.kafka.common.serialization.StringSerializer");

kafkaProperties.put("value.serializer",                           "org.apache.kafka.common.serialization.StringSerializer");

kafkaProperties.put("acks", "all");

// 根據配置建立Kafka生產者
KafkaProducer<String, String> kafkaProducer 
    = new KafkaProducer<String, String>(kafkaProperties);

在建立Kafka生產者以前,有3個必選的配置項

配置項 解釋
bootstrap.servers 該屬性指定broker 的地址清單,地址的格式爲host:port, host:port。
key.serializer key.serializer 必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵序列化成字節數組。Kafka 客戶端默認提供了ByteArraySerializer、StringSerializer 和IntegerSerializer
value.serializer 與key.serializer 同樣,value.serializer 指定的類會將值序列化

3.3 發送消息到Kafka

實例化生產者對象後,接下來就能夠開始發送消息了。發送消息主要有如下3 種方式:

  • 發送並忘記(fire-and-forget)
    • 咱們把消息發送給服務器,但並不關心它是否正常到達
    • 大多數狀況下,消息會正常到達,由於Kafka 是高可用的,並且生產者會自動嘗試重發。
    • 不過,使用這種方式有時候也會丟失一些消息
  • 同步發送
    • 咱們使用send() 方法發送消息,它會返回一個Future 對象調用get() 方法進行等待,就能夠知道消息是否發送成功。
  • 異步發送
    • 咱們調用send() 方法,並指定一個回調函數,服務器在返回響應時調用該函數。

下面分別演示這三種方式

3.3.1 發送並忘記

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
	"sun", "s1", "cn dota best dota");

// 發送消息
try{
    kafkaProducer.send(producerRecord);
}catch(Exception e){
    e.printStackTrace();
}
  • 生產者的send() 方法將ProducerRecord對象做爲參數。ProducerRecord中包含目標主題的名字和要發送的對象和對象。
  • 咱們使用生產者的send()方法發送ProducerRecord對象。send() 方法會返回一個包含RecordMetadata 的Future對象,不過由於咱們這裏忽略了返回值,因此沒法知道消息是否發送成功。若是不關心發送結果,那麼可使用這種發送方式。好比,記錄日誌。
  • 雖然咱們忽略返回值的同時也忽略了返回的異常。不過在發送消息以前,生產者仍是有可能發生其餘的異常,這些異常將被拋出。這些異常有多是:
    • SerializationException(說明序列化消息失敗)
    • BufferExhaustedException 或TimeoutException(說明緩衝區已滿)
    • 又或者是InterruptException(說明發送線程被中斷)。

3.3.2 同步發送

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
	"sun", "s1", "cn dota best dota");

// 發送消息
try{
    kafkaProducer.send(producerRecord).get();
}catch(Exception e){
    e.printStackTrace();
}
  • producer.send()方法先返回一個Future對象,而後調用Future 對象的get()方法阻塞當前線程來等待Kafka響應。

    • 若是服務器返回一個錯誤,get() 方法會拋出異常
    • 不然,咱們會獲得一個RecordMetadata對象,能夠經過它獲取消息的主題分區偏移量等信息。
  • 若是在發送消息以前或者在發送消息的過程當中發生了任何錯誤,好比broker 返回了一個不容許重發消息的異常或者已經超過了重發的次數,那麼就會拋出異常

KafkaProducer通常會發生兩類錯誤。

  • 一類是可重試錯誤,這類錯誤能夠經過重發消息來解決。好比對於鏈接錯誤,能夠經過再次創建鏈接來解決,「無主(no leader)」錯誤則能夠經過從新爲分區選舉首領來解決。KafkaProducer 能夠被配置成自動重試,若是在屢次重試後仍沒法解決問題,應用程序會收到一個重試異常
  • 另外一類錯誤沒法經過重試解決,好比「消息太大」異常。對於這類錯誤,KafkaProducer直接拋出異常

3.3.3 異步發送

若是隻發送消息而不等待響應,那麼能夠避免阻塞線程來等待,從而提升發送效率。

大多數時候,咱們並不須要等待響應——儘管Kafka會把目標主題、分區信息和消息的偏移量發送回來,但對於發送端的應用程序來講不是必需的。不過在遇到消息發送失敗時,咱們須要拋出異常記錄錯誤日誌,或者把消息寫入「錯誤消息」文件以便往後分析。

這時咱們能夠爲send()方法註冊一個回調函數,讓它來處理異步調用的返回結果

// 建立ProducerRecord,它是一種消息的數據結構
ProducerRecord<String, String> producerRecord 
    = new ProducerRecord<String, String>("sun", "s1", "cn dota best dota");

// 發送消息
kafkaProducer.send(producerRecord, new Callback() {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            e.printStackTrace();
        }else{
            System.out.println(recordMetadata);
        }   
    }
});
  • 這裏經過註冊一個回調函數處理異步發送的結果
    • 若是錯誤,則onCompletion方法的Exception參數不爲null,咱們能夠針對這個異常進行處理
    • 若是沒有錯誤,RecordMetadata不爲null,咱們能夠從中獲取主題信息、分區信息、偏移量信息

3.4 生產者的配置

生產者還有不少可配置的參數,在Kafka 文檔裏都有說明,它們大部分都有合理的默認值,因此沒有必要去修改它們。不過有幾個參數在內存使用、性能和可靠性方面對生產者影響比較大,接下來咱們會一一說明。

3.4.1 acks

acks參數指定了必須要有多少個分區副本收到消息生產者纔會認爲消息寫入是成功的。

  • 若是acks=0,生產者發送消息以後就馬上認爲消息寫入成功

    • 也就是說,若是服務器沒有收到消息,生產者也無從得知,消息也就丟失了。
    • 不過,由於生產者不須要等待服務器的響應,因此它能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量
  • 若是acks=1,只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功響應

    • 若是消息沒法到達首領節點(好比首領節點崩潰,新的首領尚未被選舉出來),生產者會收到一個錯誤響應,爲了不數據丟失,生產者會重發消息。
  • 若是acks=all,只有當首領節點全部複製節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應

    • 這種模式是最安全的。
    • 不過,它的延遲比acks=1 時更高,由於咱們要等待不僅一個服務器節點接收消息。

3.4.2 retries(重試次數)

retries 參數的值決定了生產者能夠重發消息的次數,若是達到這個次數,生產者會放棄重試並返回錯誤。默認狀況下,生產者會在每次重試之間等待100ms。

由於生產者會自動進行重試,因此就不必在代碼邏輯裏處理那些可重試的錯誤。你只須要處理那些不可重試的錯誤或重試次數超出上限的狀況。

3.4.3 batch.size(批次大小)和linger.ms(批次等待時間)

KafkaProducer會在批次填滿或linger.ms達到上限時把批次發送出去。

該參數指定了一個批次可使用的內存大小按照字節數計算(而不是消息個數)。當批次被填滿,批次裏的全部消息會被髮送出去。

該參數指定了生產者在發送批次以前等待更多消息加入批次的時間

3.4.4 max.in.flight.requests.per.connection

該參數指定了生產者收到服務器響應以前能夠發送多少個消息

  • 它的值越高,就會佔用越多的內存,不過也會提高吞吐量。
  • 把它設爲1能夠保證消息是按照發送的順序寫入服務器的,即便發生了重試。

3.5 序列化器

咱們已經在以前的例子裏看到,建立一個生產者對象必須指定序列化器。咱們已經知道如何使用默認的字符串序列化器,Kafka 還提供了整型和字節數組序列化器,不過它們還不足以知足大部分場景的需求。到最後,咱們須要序列化的記錄類型會愈來愈多。

接下來演示如何開發自定義序列化器,並介紹Avro序列化器。若是發送到Kafka的對象 不是簡單的字符串整型,那麼可使用序列化框架來建立消息記錄,如Avro、Thrift 或Protobuf,或者使用自定義序列化器。咱們強烈建議使用通用的序列化框架

3.5.1 自定義序列化器

/**
 * 一個簡單的pojo,爲了演示如何自定義序列化器
 * */
public class Customer {

    private int customerId;

    private String customerName;

    public Customer(int customerId, String customerName) {
        this.customerId = customerId;
        this.customerName = customerName;
    }

    public Customer() {
    }

    public int getCustomerId() {
        return customerId;
    }

    public void setCustomerId(int customerId) {
        this.customerId = customerId;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }
}
/**
 * 自定義序列化器
 * */
public class CustomerSerializer implements Serializer<Customer> {
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 不須要配置任何
    }

    /**
     * Customer對象的序列化函數,組成以下
     * 前4字節: customerId
     * 中間4字節: customerName字節數組長度
     * 後面n字節: customerName字節數組
     * */
    public byte[] serialize(String topic, Customer data) {
        try{
            byte[] serializedName;
            int stringSize;
            if(data == null){
                return null;
            }else{
                if(data.getCustomerName() != null){
                    serializedName = data.getCustomerName().getBytes("utf-8");
                    stringSize = serializedName.length;
                } else{
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerId());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch(Exception e){
            throw new SerializationException(
                "Error when serializing Customer to byte[] " + e);
        }
    }

    public byte[] serialize(String topic, Headers headers, Customer data) {
        return serialize(topic, data);
    }

    public void close() {
        // 不須要關閉任何
    }
}

不過咱們不建議採用自定義序列化器,理由以下

  • 若是咱們有多種類型的消費者,可能須要把customerID 字段變成長整型,或者爲Customer 添加startDate 字段,這樣就會出現新舊消息的兼容性問題
  • 在不一樣版本的序列化器和反序列化器之間調試兼容性問題着實是個挑戰——你須要比較原始的字節數組。
  • 更糟糕的是,若是同一個公司的不一樣團隊都須要往Kafka 寫入Customer 數據,那麼他們就須要使用相同的序列化器,若是序列化器發生改動他們幾乎都要在同一時間修改代碼

3.5.2 使用Avro序列化

Apache Avro是一種與編程語言無關序列化格式

Avro數據經過與語言無關的schema來定義

  • schema經過JSON來描述
  • 數據能夠被序列化成二進制文件或JSON 文件,不過通常會使用二進制文件。
  • Avro 在讀寫文件須要用到schema,schema 通常會被內嵌在數據文件裏。

Avro 有一個頗有意思的特性是,當負責寫消息的應用程序使用了新版本的schema,負責讀消息的應用程序能夠繼續處理消息而無需作任何改動,這個特性使得它特別適合用在像Kafka 這樣的消息系統上。

下面舉個例子

假設咱們有一個v0.0.1的schema

{
"namespace": "customerManagement.avro",
	"type": "record",
	"name": "Customer",
	"fields": [
		{"name": "id", "type": "int"},
		{"name": "name", "type": "string"},
		{"name": "faxNumber", "type": ["null", "string"], "default": "null"} 
	]
}

過了一段時間,咱們須要刪除faxnumber字段(傳真號碼),添加一個email字段,新的schema(v0.0.2)以下

{
"namespace": "customerManagement.avro",
	"type": "record",
	"name": "Customer",
	"fields": [
		{"name": "id", "type": "int"},
		{"name": "name", "type": "string"},
		{"name": "email", "type": ["null", "string"], "default": "null"} 
	]
}

更新到新版的schema 後:

  • 舊記錄仍然包含faxNumber 字段
  • 而新記錄則包含email 字段
  • 部分負責讀取數據的應用程序進行了升級,那麼它們是如何處理這些變化的呢?

消費者升級以前

  • 它們會調用相似getName()、getId() 和getFaxNumber() 這樣的方法。
  • 若是碰到使用新schema 構建的消息,getName() 和getId() 方法仍然可以正常返回,但getFaxNumber() 方法會返回null,由於消息裏不包含傳真號碼。

消費者升級以後

  • getEmail() 方法取代了getFaxNumber() 方法。
  • 若是碰到一個使用舊schema 構建的消息,那麼getEmail() 方法會返回null,由於舊消息不包含郵件地址。

如今能夠看出使用Avro 的好處了:咱們修改了消息的schema,但並不須要更新全部消費者,而這樣仍然不會出現異常或阻斷性錯誤,也不須要對現有數據進行大幅更新。

3.5.3 在Kafka中使用Avro

若是在每條Kafka 記錄裏都嵌入schema,會讓記錄的大小成倍地增長。

  • 可是在讀取記錄時仍然須要用到整個schema,因此要先找到schema。
  • 咱們要使用「schema 註冊表」來達到目的。
  • Kafka中並不包含schema註冊表的實現,如今已經有一些開源的schema註冊表實現,好比:Confluent Schema Registry。
  • 咱們把全部寫入數據須要用到的schema保存在註冊表裏,而後在記錄裏放一個schema的標識符
  • 消費者使用記錄中的標識符從註冊表里拉取schema來反序列化記錄。

3.6 分區

在以前的例子裏,ProducerRecord對象包含了目標主題。Kafka 的消息是一個個鍵值對,ProducerRecord 對象能夠只包含目標主題和值能夠設置爲默認的null,不過大多數應用程序會用到鍵。

鍵有兩個用途:

  1. 能夠做爲消息的附加信息
  2. 也能夠用來決定消息該被寫到主題的哪一個分區。擁有相同鍵的消息將被寫到同一個分區。

若是要建立鍵爲null的消息,不指定鍵就能夠

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");

若是鍵爲null,而且使用了默認的分區器,那麼記錄將被隨機地發送到主題內各個可用的分區上。分區器使用輪詢(Round Robin)算法將消息均衡地分佈到各個分區上。

若是鍵不爲空,而且使用了默認的分區器,那麼Kafka會使用Kafka本身的散列算法對鍵進行散列(使用Kafka 本身的散列算法,即便升級Java 版本,散列值也不會發生變化),而後根據散列值把消息映射到特定的分區上。這裏的關鍵之處在於,同一個鍵老是被映射到同一個分區上

只有在不改變主題分區數量的狀況下,鍵與分區之間的映射才能保持不變。舉個例子,在分區數量保持不變的狀況下,能夠保證用戶045189 的記錄老是被寫到分區34。在從分區讀取數據時,能夠進行各類優化。不過,一旦主題增長了新的分區,這些就沒法保證了——舊數據仍然留在分區34,但新的記錄可能被寫到其餘分區上。若是要使用鍵來映射分區,那麼最好在建立主題的時候就把分區規劃好,並且永遠不要增長新分區。

3.6.1 自定義分區器

默認狀況下,kafka自動建立的主題的分區數量爲1,因此咱們須要先修改分區數量,來讓自定義分區器有點用。

  1. 先cd到opt\kafka\bin\

  2. 運行命令:

    ./kafka-topics.sh --zookeeper 47.94.139.116:2181/kafka --alter --topic sun --partitions 4
  3. 查看是否修改爲功

    ./kafka-topics.sh --describe --zookeeper 47.94.139.116:2181/kafka --topic sun

或者也能夠經過修改server.properties中的nums.partions並重啓,來更改默認分區數量。

下面自定義一個分區器,它根據鍵來劃分分區:若鍵爲Banana則放入最後一個分區,若鍵不爲Banana則散列到其餘分區。

/**
 * 自定義分區器
 * 若鍵爲Banana則放入最後一個分區,若鍵不爲Banana則散列到其餘分區
 * */
public class BananaPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 分區信息列表
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        //分區數量
        int partitionsAmount = partitions.size();

        // 若是隻有一個分區,那全都放到partition0就完事了
        if(partitionsAmount == 1){
            return 0;
        }

        if(keyBytes == null || ! (key instanceof String)){
            throw new InvalidRecordException("We expect all messages to have customer name as key");
        }

        if(key.equals("Banana")){
            return partitionsAmount - 1;
        }

        return (Math.abs(Utils.murmur2(keyBytes)) % (partitionsAmount - 1));

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

而後,在producer配置中加入kafkaProperties.put("partitioner.class", "cn.edu.neu.demo.ch3.partitioner.BananaPartitioner");便可。

相關文章
相關標籤/搜索