kafka 系列 -- 3.一、生產者客戶端基本使用

必要的參數

  • bootstrap.servers

該參數爲 broker 地址,不須要所有都填,由於 kafka 會從當前 broker 中獲取其餘 broker 信息。不過爲了某個 broker 掛掉,通常填多個 broker 地址java

  • key.serializer

消息 key 如何序列化apache

  • value.serializer

消息內容如何序列化bootstrap

示例代碼ide

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

生產者攔截器

在消息發送前,對消息進行處理,該動做發生在序列化器分區器以前。編碼

實現 org.apache.kafka.clients.producer.ProducerInterceptor 接口,便可自定義攔截器code

介紹一下接口定義的方法server

  • ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)

在消息發送以前,能夠對消息進行處理接口

  • void onAcknowledgement(RecordMetadata metadata, Exception exception

消息被應答以前或者消息發送失敗時被調用get

  • void close()

producer 被關閉時,會調用kafka

kafka 容許配置攔截器鏈,多個攔截器用 , 號隔開便可。

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());

序列化

序列化發生在分區器以前
實現 org.apache.kafka.common.serialization.StringSerializer 接口便可自定義序列化

介紹一下接口定義的方法

  • void configure(Map<String, ?> configs, boolean isKey)

StringSerializer 實現中,用於設置編碼

  • byte[] serialize(String topic, String data)

定義如何序列化

  • void close()

producer 關閉時,被調用

分區器

實現 org.apache.kafka.clients.producer.Partitioner 便可自定義分區器

kafka 可按 key 進行哈希(MurmurHash2),將消息發往同一個分區。若是未指定 key,那麼將會把消息發往隨機的一個分區。

介紹一下接口定義的方法

  • int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)

定義發往哪一個分區;具體的實現可參考DefaultPartitioner

  • void close()

producer 關閉時,被調用

與 RocketMQ 異同

  1. kafka 一致,rocketMQ 容許生產者將消息發送到指定的 'partition' 中
  2. rocketMQ 沒有 序列化器 的概念。消息內容由 rocketMQ 自行序列化
  3. 從我的目前的使用狀況,rocketMQ 也沒有提供相似 攔截器 概念
  4. rocketMQ 提供了 hock 以此在消息發送前,和消息發送後,對消息進行處理

例如:

DefaultMQProducer producer = new DefaultMQProducer("default");

producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
    @Override
    public String hookName() {
        return null;
    }

    @Override
    public void sendMessageBefore(SendMessageContext context) {

    }

    @Override
    public void sendMessageAfter(SendMessageContext context) {

    }
});
相關文章
相關標籤/搜索