【大數據實踐】Kafka生產者編程(1)——KafkaProducer詳解

前言

在文章【大數據實踐】遊戲事件處理系統系列文章中中,咱們已經作到使用filebeat收集日誌事件、logstash處理日誌事件、發送日誌事件到kafka集羣,並在消費者中消費的過程。其中,爲kafka集羣生產消息的,是logstash服務,而非咱們自定義的生成者。在本文中,將主要介紹KafkaProducer類相關的一些接口和理論知識(基於kafka 1.1版本)。html

KafkaProducer類

Package org.apache.kafka.clients.producer

public class KafkaProducer<K, V> extends java.lang.Object implements Producer<K, V>
  • KafkaProducer類用於向kafka集羣發佈消息記錄,其中<K, V>爲泛型,指明發送的消息記錄key/value對的類型。
  • kafka producer(生產者)是線程安全的,多個線程共享一個producer實例,相比於多個producer實例,這樣作的效率更高、更快。

構造函數

構造函數 描述
KafkaProducer​(java.util.Map<java.lang.String,java.lang.Object> configs) 配置信息爲Map形式,構造Producer
KafkaProducer​(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) 配置信息爲Map形式,能夠指定自定義的用於序列化key和value的類。
KafkaProducer​(java.util.Properties properties) 配置信息放在Properties對象中,構造Producer。如,能夠從配置文件***.properties中讀取,或者新建Properties對象,再設置配置信息。
KafkaProducer​(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) 配置信息放在Properties對象中,可指定自定義的key和value的序列化類。
  • 從上述構造函數能夠看出,能夠經過Map和Properties兩種形式傳遞配置信息,用於構造Producer對象,配置信息均爲key/value對。
  • 可配置的信息可參見官方配置表producercofigs,其中Value能夠是String類型,也能夠是其餘合適的類型。
  • kafka的包org.apache.kafka.common.serialization中,提供了許多已經實現好的序列化反序列化的類,能夠直接使用。你也能夠實現本身的序列化和反序列化類(實現Serializer接口),選擇合適的構造函數構造你的Producer類。
  • 想用kafka自帶的序列化類,可在配置信息中配置,如:java

    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • producer被建立以後,使用完以後,必定要記得將其close,不然會形成資源泄露的問題。

方法

修飾&返回 方法 描述
void abortTransaction​() 停止進行中的事務
void beginTransaction​() 開始事務,在開始任何新事務以前,都英應該調用此方法
void close​() 關閉該producer,釋放資源
void close​(long timeout, java.util.concurrent.TimeUnit timeUnit) 等待timeout指定的時長後關閉該producer,以便producer能夠將還未完成發送的消息發送完。timeUnit爲時間單位。若是超時,則強制關閉。
void commitTransaction​() 提交進行中的事務
void flush() 調用次方法,使kafka生成者發送緩衝區中的消息記錄(record)能夠被當即發送(即便linger.ms大於0)。而且一直阻塞,直到這些消息記錄都發送完成
void initTransactions​() 當構造producer時,若是配置了transactional.id,那麼在調用其transaction相關函數以前,都必須先調用該函數
java.util.Map<MetricName,? extends Metric> metrics​() 列出producer中維護的全部內部監控(metrics)設置
java.util.List<PartitionInfo> partitionsFor​(java.lang.String topic) 從指定的主題(topic)中,獲取分區(partition)的元數據
java.util.concurrent.Future<RecordMetadata> send​(ProducerRecord<K,V> record) 異步發送消息記錄(record)到指定的主題
java.util.concurrent.Future<RecordMetadata> send​(ProducerRecord<K,V> record, Callback callback) 異步發送一個消息記錄到指定topic,當發送被確認完成以後,調用回調函數(callback)
void sendOffsetsToTransaction​(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId) 發送指定的偏移量列表(offsets)到消費者組協調者(consumer group coordinator),而且將這些偏移量(offsets)標記爲當前事務的一部分
  • 若是close()方法是在回調方法callback中被調用,那麼kafka將會輸出一條警告日誌,而且將其替換爲close(0, TimeUnit.MILLISECONDS),這樣作的目的是爲了不發送線程(sender thread)永遠阻塞。
  • flush()函數的後置條件(方法順利執行完畢以後必須爲真的條件)是:待發送緩衝區中全部待發送的記錄record都發送完成。發送完成指的是成功收到了在構建producer時設置的acks 配置的確認acks
  • 當一個線程阻塞於flush調用(等待其完成)時,其餘線程能夠繼續發送消息記錄record,但不保證這些在flush()調用開始以後發送的消息記錄可以真正完成。能夠經過設置重試配置retries=<large_number>來下降消息記錄不被送達的狀況。
  • 對於事務性的producer,不須要調用flush()函數,由於commitTransaction() 函數在提交事務以前,會將緩衝中的記錄進行flush,這樣能夠確保那些在 beginTransaction() 以前被send(ProducerRecord)的消息記錄將在事務提交以前完成。
  • 在第一次調用beginTransaction()方法以前,必須調用一次initTransactions()方法。
  • initTransactions()方法的主要做用是:apache

    • 確保被其餘producer實例初始化的、具備相同transactional.id的事務被完成。若是這些事務在進行過程當中失敗,則事務被停止;若是事務已經開始,但還沒完成,那麼initTransactions()函數會等待它完成。
    • 獲取內部producer的id和epoch,用於後續該producer產生的事務性消息。
  • send​(ProducerRecord<K,V> record, Callback callback)方法將在record被送入到待發送的buffer以後,當即返回。所以,可容許並行發送record,而不用阻塞等待每一個record發送完成。該方法的返回值RecordMetadata爲該record被髮送到的分區partition元數據,如偏移量offset,建立時間CreateTime等。要想阻塞等待發送完成,能夠調用Future的get()方法,如:segmentfault

    producer.send(record).get();

小結

上面基本上對KafkaProducer類的主要接口作了解釋,主要參考了官方文檔,從上面的一些方法中,能夠看到kafka的一些特性,如:發送緩衝區機制,事務性producer等,這些複雜概念將在後續文章中再作深刻探索。安全

相關文章
相關標籤/搜索