【大數據實踐】Kafka生產者編程(3)——Interceptor & Partitioner

前言

在上一篇文章【大數據實踐】Kafka生產者編程(2)——producer發送流程中,對自定義Interceptor和自定義Partitioner作了簡單介紹,沒有作深刻講解。所以,在本文章中,嘗試補充介紹Interceptor和Partitioner的一些理論知識,並介紹如何自定義者兩個類。java

Producer攔截器(interceptor)和攔截鏈

實現接口

攔截器(interceptor)可讓用戶在消息記錄發送以前,或者producer回調方法執行以前,對消息或者回調信息作一些邏輯處理。攔截器實現瞭如下接口:算法

package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable {
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}
  • onSend():onSend函數將會在消息記錄被髮送以前被調用,它能夠對ProducerRecord作一些處理,返回處理以後的ProducerRecord
  • onAcknowledgement():onAcknowledgement方法將在send時指定的回調函數執行以前被調用,可對執行結果進行一些處理。
  • close():close方法將在執行producer.close()的時候被調用,能夠釋放資源等。

攔截鏈ProducerInterceptors

攔截鏈(ProducerInterceptors)包含了一個由多個攔截器組裝起來的攔截器列表List<ProducerInterceptor<K, V>> ,在producer發送消息,消息迴應以及close時,攔截鏈的onSend、onAcknowledgement、close方法會被調用,而這些方法中,會逐一調用每一個攔截器的onSend、onAcknowledgement、close方法。就像是生成流水線上,各個處理程序同樣。apache

攔截鏈類所在位置:編程

package org.apache.kafka.clients.producer.internals;

public class ProducerInterceptors<K, V> implements Closeable {}

自定義攔截器

自定義一個計數攔截器,以下:segmentfault

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor<Integer, String> {
    public int sendCounter = 0;
    public int succCounter = 0;
    public int failCounter = 0;

    public void configure(Map<String, ?> configs) {

    }

    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("onSend called in CounterInterceptor, key = " + record.key());
        sendCounter++;
        return  record;
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception exception) {
        if (exception == null) {
            System.out.println("record send ok. topic = " + recordMetadata.topic() + "partion = " + recordMetadata.partition());
            succCounter++;
        } else {
            System.out.println("record send failed. topic = " + recordMetadata.topic() + "partion = " + recordMetadata.partition());
            failCounter++;
        }

    }

    public void close() {
        System.out.println("sendCounter = " + sendCounter + " succCounter = " + succCounter + " failCounter = " + failCounter);
    }

}

將攔截器裝配到自定義的Producer中:數組

package myproducers; 

/**
 * kafka消息生產者——
 */

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;


public class GameRecordProducer {
    public static final String KAFKA_SERVER_URL = "localhost";
    public static final int KAFKA_SERVER_PORT = 9092;

    public GameRecordProducer() {}


    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myproducers.GameRecordProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        List<String> intercepters = new ArrayList<String>();
        intercepters.add("myproducers.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, intercepters);
        KafkaProducer<Integer, String> producer;
        producer = new KafkaProducer<Integer, String>(props);


        try {
            producer.send(new ProducerRecord<Integer,String>("game-score","message1")).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

消息記錄類ProducerRecord

消息記錄類中,記錄了須要發送的消息內容以及要發送到的主題、分區等內容。類的定義以下:dom

package org.apache.kafka.clients.producer;

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
  • topic:必須字段,表示該消息記錄record發送到那個topic。
  • value:必須字段,表示消息內容。
  • partition:可選字段,要發送到哪一個分區partition。函數

    • 若是record中設置了partition,則發往該partition;
    • 若是沒有設置partition,但指定key值,則會根據key序列化以後的字節數組的hashcode進行取模運算,獲得partition。
    • 若是沒有設置partition和key,則producer會採用迭代方式(相似於隨機數)。
  • key:可選字段,消息記錄的key,可用於計算選定partition。
  • timestamp:可選字段,時間戳;表示該條消息記錄的建立時間createtime,若是不指定,則默認使用producer的當前時間。
  • headers:可選字段。

默認partition算法

kafka producer的partition制定策略爲:大數據

  • 若是record中設置了partition,則發往該partition;
  • 若是沒有設置partition,但指定key值,則會根據key序列化以後的字節數組的hashcode進行取模運算,獲得partition。
  • 若是沒有設置partition和key,則producer會採用相似於輪詢方式(但不是嚴格輪詢,而是相似於隨機數)。

具體算法源代碼以下:this

package org.apache.kafka.clients.producer.internals;

import ...

public class DefaultPartitioner implements Partitioner {

    // ...
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 從集羣中獲取該topic分區列表及分區數量。
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
       
        if (keyBytes == null) {
            // 沒有指定key值,及key值序列化以後爲null,則獲取下一個可用的partition值
            int nextValue = this.nextValue(topic);
            // 獲取該topic可用的分區列表
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                // 可用分區列表大於0時,
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                // toPositive:確保爲正數,Math.abs(Integer.MIN_VALUE)爲負數,因此不能用。
                // toPositive(Integer.MIN_VALUE) == 0 
                // toPositive(-1) == 2147483647 
                // 取餘
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 使用murmur2 hash算法,求得值,在取餘
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    // 獲取下一個值
    private int nextValue(String topic) {
        
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    ...
}

自定義Partitioner

除了使用默認的Partitioner以外,也可使用自定義的Partitioner,已實現更好的分區均衡。

package myproducers;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class ConstantPartioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 固定永遠返回1,即所有放在1分區
        return 1;
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {
    }
}

在構建KafkaProducer對象時,在配置信息中,將自定義的Partitioner類配置進去:

kafkaProps.put("partitioner.class", "myproducer.ConstantPartitioner");

小結

本文章介紹了kafka producer中兩個比較獨立概念,在實際開發過程當中,可做爲咱們程序的擴展點。後一篇文章將繼續圍繞KafkaProducer的配置細節進行分析,以瞭解Kafka發送過程當中的更多的細節和機制。

相關文章
相關標籤/搜索