Kafka2.0生產者客戶端使用

1 初始化配置

  Kafka 經過 KafkaProducer 構造器初始化生產者客戶端的配置。
  經常使用的重要配置,詳見官網html

  • bootstrap.servers:Kafka 集羣地址(host1:post,host2:post),Kafka 客戶端初始化時會自動發現地址,因此能夠不填寫全部地址。
  • key.serializer:實現了 Kafka 序列化接口的類,用來序列化 key。
  • value.serializer:實現了 Kafka 序列化接口的類,用來序列化 value。
  • acks:leader 接收到的 follower 確認的數量須要知足 acks 的配置。
     0:生產者把消息發送出去就認爲發送完成了。
     1:leader 接收到消息後,不用等 follower 的確認,就表示發送完成了。
     all/-1:leader 接收到消息後,須要全部在 ISR 集合的 follower 確認後,才表示完成了。
  • retries:消息發送失敗後的重試次數。若是容許重試,而 max.in.flight.requests.per.connection>1,則可能致使消息亂序,由於若是把兩批消息發送到同一個分區,第一批失敗並重試,而第二批成功了,則第二批消息可能先生成了。
  • retry.backoff.ms:消息重試發送的間隔。
  • client.id:標識客戶端的 id。
  • compression.type:壓縮類型。可選:none、gzip、snappy、lz4。
  • buffer.memory:記錄累加器能夠使用的最大內存緩衝池大小。
  • batch.size:內存緩衝池的緩衝列表大小。當 batch 的大小超過 batch.size 或者時間達到 linger.ms 就會發送 batch。
  • transactional.id:事務 ID。
// 基礎配置
Map<String, Object> configs = new HashMap<>();
// Kafka broker 集羣
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// key 序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value 序列化
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

2 構造消息

  Kafka 提供了6種構造器來構造消息。apache

  • topic:消息主題,必填;
  • partition:分區號,非必填。若是爲空,會計算 key 的 hash 值,再和該主題的分區總數取餘獲得分區號;若是 key 也爲空,客戶端會生成遞增的隨機整數,再和該主題的分區總數區域獲得分區號。
  • timestamp:時間戳,非必填。若是爲空,默認爲 KafkaProducer 構造器初始化的時間。
  • key:消息 key,非必填。關係到分區分配,broker 會對帶 key 的消息進行日誌壓縮。
  • value:消息內容,必填。
  • headers:消息頭,非必填。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, V value);

3 發送消息

  支持同步發送和異步發送消息。bootstrap

  同步發送app

producer.send(record).get();

  異步發送異步

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // 回調處理流程
    }
});
相關文章
相關標籤/搜索