【大數據實踐】Kafka生產者編程(2)——producer發送流程

前言

在上一篇文章【大數據實踐】Kafka生產者編程(1)——KafkaProducer詳解中,主要對KafkaProducer類中的函數進行了詳細的解釋,但僅針對其中的一些方法,對於producer背後的原理、機制,沒有作深刻講解。所以,在本文章中,嘗試介紹kafka producer整個發送流程。在寫做此文章時,本身也處於對Kafka的學習階段,可能有些細節掌握的並不精確,但願你們指正。java

Producer消息發送流程

clipboard.png

備註:圖片來源:https://blog.csdn.net/zhanglh...算法

構造KafkaProducer對象

上一篇文章中,詳細介紹了KafkaProducer的構造函數,其主要是對producer的一些選項進行配置。配置項可在類ProducerConfig中找到:apache

package org.apache.kafka.clients.producer;

public class ProducerConfig extends AbstractConfig {
...
}

其中,除了能夠配置一些簡單的數值,還能夠配置一些kafka自帶或者咱們自定義的類,如:編程

  • key.serializer:key的序列化類,kafka在包package org.apache.kafka.common.serialization;中實現了一系列經常使用的序列化和反序列化的類。若要自定義序列化類,則須要實現接口org.apache.kafka.common.serialization.Serializer,如Integer的序列化類:segmentfault

    package org.apache.kafka.common.serialization;
    
     import java.util.Map;
    
     public class IntegerSerializer implements Serializer<Integer> {
     public IntegerSerializer() {
     }
    
     public void configure(Map<String, ?> configs, boolean isKey) {
     }
    
     public byte[] serialize(String topic, Integer data) {
         return data == null ? null : new byte[]{(byte)(data.intValue() >>> 24), (byte)(data.intValue() >>> 16), (byte)(data.intValue() >>> 8), data.byteValue()};
     }
    
     public void close() {
     }
     }
  • value.serializer:value的序列化類。
  • partitioner.class:partition分配的類,使消息均勻發送到topic的各個分區partition中,Kafka默認partition爲org.apache.kafka.clients.producer.internals.DefaultPartitioner。若要自定義負載均衡算法,須要實現org.apache.kafka.clients.producer.Partitioner接口。
  • 攔截鏈Interceptors:爲攔截器List,可讓用戶在消息記錄發送以前,或者producer回調方法執行以前,對消息或者回調信息作一些邏輯處理。能夠經過實現org.apache.kafka.clients.producer.ProducerInterceptor接口來定義本身的攔截器。

構造ProducerRecord

ProducerRecord即消息記錄,記錄了要發送給kafka集羣的消息、分區等信息:緩存

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
  • topic:必須字段,表示該消息記錄record發送到那個topic。
  • value:必須字段,表示消息內容。
  • partition:可選字段,要發送到哪一個分區partition。
  • key:可選字段,消息記錄的key,可用於計算選定partition。
  • timestamp:可選字段,時間戳;表示該條消息記錄的建立時間createtime,若是不指定,則默認使用producer的當前時間。
  • headers:可選字段,(做用暫時不明,待再查證補充)。

發送ProducerRecord

異步發送 & 同步發送

異步發送時,直接將消息記錄扔進發送緩衝區,當即返回,有另外的線程負責將緩衝區中的消息發送出去。異步發送時,須要設置callback方法,當收到broker的ack確認時,將調用callback方法。下面直接貼kafka官方例子中,展現的異步和同步發送方法:負載均衡

package kafka.examples;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer extends Thread {
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Boolean isAsync;

    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                    messageNo,
                    messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }
}

class DemoCallBack implements Callback {

    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
     * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
     * non-null.
     *
     * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
     *                  occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

攔截鏈攔截處理ProducerRecord

調用send方法時,首先攔截器Interceptor將攔截ProducerRecord,調用Interceptor的onSend方法,對消息記錄進行一些處理,返回處理後的ProducerRecord。異步

對ProducerRecord的Key和Value序列化

調用配置的key 和 value的序列化類,對ProducerRecord的key和value進行序列化,並設置到ProducerRecord中。async

設置ProducerRecord的partition

經過DefaultPartitioner類或者配置項中指定的自定義Partitioner類中的partiton方法,計算出消息要發送到topic中某個分區partition。設置到ProducerRecord中。ide

檢查ProducerRecord長度是否超過限制

根據配置項max.request.sizebuffer.memory進行檢查,超出任何一項就會拋出異常。

設置ProducerRecord時間戳

若是ProducerRecord構建時已經指定時間戳,則用構建時指定的,不然用當前時間。

ProducerRecord放入緩衝區

ProducerRecord放入緩存區(RecordAccumulator維護)時,發往相同topic的相同partition的消息記錄將會被捆綁batch壓縮,壓縮到ProducerBatch中。也就是說,ProducerBatch中可能包含多個ProducerRecord。這樣作的目的是爲了一次請求發送多個record,提升性能。

RecordAccumulator爲每個TopicPartition維護了一個雙端隊列:

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

相同topic的相同partition的ProducerBatch將被放在對應的隊列中。

壓縮策略有:

·NONE:就是不壓縮。
·GZIP:壓縮率爲50%
·SNAPPY:壓縮率爲50%
·LZ4:壓縮率爲50%

喚醒Sender

當一個ProducerBatch已滿或者有新的ProducerBatch到達時,會喚醒真正發送消息記錄的發送線程Sender,將ProducerBatch發送到kafka集羣。

Sender的發送邏輯以下:

  1. 檢查kafka集羣中是否存在要發送的ProducerBatch對應的leader partition,存在則認爲可發送,不存在說明服務端出現了問題,則該Batch暫不發送。
  2. 過濾掉過時的ProducerBatch,對於過時的ProducerBatch,會經過Sensor通知Interceptor發送失敗。
  3. 發送Batch。
  4. 處理髮送結果,調用callback和攔截器的onAcknowledge進行處理。

小結

本文章對producer消息大致發送流程進行一次梳理,其中有一些本身還不是特別懂,也就沒有寫得特別詳細,後續若是有進一步的瞭解,將修改本文進行補充。後面的文章將對發送過程當中構建Producer時,自定義Inteceptor和自定義Partitioner進行介紹。

相關文章
相關標籤/搜索