在文章【大數據實踐】遊戲事件處理系統系列文章中中,咱們已經作到使用filebeat收集日誌事件、logstash處理日誌事件、發送日誌事件到kafka集羣,並在消費者中消費的過程。其中,爲kafka集羣生產消息的,是logstash服務,而非咱們自定義的生成者。在本文中,將主要介紹KafkaProducer
類相關的一些接口和理論知識(基於kafka 1.1版本)。html
Package org.apache.kafka.clients.producer public class KafkaProducer<K, V> extends java.lang.Object implements Producer<K, V>
KafkaProducer
類用於向kafka集羣發佈消息記錄,其中<K, V>
爲泛型,指明發送的消息記錄key/value對的類型。構造函數 | 描述 |
---|---|
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs) | 配置信息爲Map形式,構造Producer |
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) | 配置信息爲Map形式,能夠指定自定義的用於序列化key和value的類。 |
KafkaProducer(java.util.Properties properties) | 配置信息放在Properties對象中,構造Producer。如,能夠從配置文件***.properties 中讀取,或者新建Properties對象,再設置配置信息。 |
KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) | 配置信息放在Properties對象中,可指定自定義的key和value的序列化類。 |
Producer
對象,配置信息均爲key/value
對。org.apache.kafka.common.serialization
中,提供了許多已經實現好的序列化和反序列化的類,能夠直接使用。你也能夠實現本身的序列化和反序列化類(實現Serializer
接口),選擇合適的構造函數構造你的Producer
類。想用kafka自帶的序列化類,可在配置信息中配置,如:java
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
close
,不然會形成資源泄露的問題。修飾&返回 | 方法 | 描述 |
---|---|---|
void | abortTransaction() | 停止進行中的事務 |
void | beginTransaction() | 開始事務,在開始任何新事務以前,都英應該調用此方法 |
void | close() | 關閉該producer,釋放資源 |
void | close(long timeout, java.util.concurrent.TimeUnit timeUnit) | 等待timeout指定的時長後關閉該producer,以便producer能夠將還未完成發送的消息發送完。timeUnit爲時間單位。若是超時,則強制關閉。 |
void | commitTransaction() | 提交進行中的事務 |
void | flush() | 調用次方法,使kafka生成者發送緩衝區中的消息記錄(record)能夠被當即發送(即便linger.ms 大於0)。而且一直阻塞,直到這些消息記錄都發送完成 |
void | initTransactions() | 當構造producer時,若是配置了transactional.id ,那麼在調用其transaction相關函數以前,都必須先調用該函數 |
java.util.Map<MetricName,? extends Metric> | metrics() | 列出producer中維護的全部內部監控(metrics)設置 |
java.util.List<PartitionInfo> | partitionsFor(java.lang.String topic) | 從指定的主題(topic)中,獲取分區(partition)的元數據 |
java.util.concurrent.Future<RecordMetadata> | send(ProducerRecord<K,V> record) | 異步發送消息記錄(record)到指定的主題 |
java.util.concurrent.Future<RecordMetadata> | send(ProducerRecord<K,V> record, Callback callback) | 異步發送一個消息記錄到指定topic,當發送被確認完成以後,調用回調函數(callback) |
void | sendOffsetsToTransaction(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId) | 發送指定的偏移量列表(offsets)到消費者組協調者(consumer group coordinator),而且將這些偏移量(offsets)標記爲當前事務的一部分 |
close()
方法是在回調方法callback
中被調用,那麼kafka將會輸出一條警告日誌,而且將其替換爲close(0, TimeUnit.MILLISECONDS)
,這樣作的目的是爲了不發送線程(sender thread)永遠阻塞。flush()
函數的後置條件(方法順利執行完畢以後必須爲真的條件)是:待發送緩衝區中全部待發送的記錄record都發送完成。發送完成指的是成功收到了在構建producer時設置的acks 配置的確認acks。flush()
調用開始以後發送的消息記錄可以真正完成。能夠經過設置重試配置retries=<large_number>
來下降消息記錄不被送達的狀況。事務性的producer
,不須要調用flush()
函數,由於commitTransaction()
函數在提交事務以前,會將緩衝中的記錄進行flush,這樣能夠確保那些在 beginTransaction()
以前被send(ProducerRecord)
的消息記錄將在事務提交以前完成。beginTransaction()
方法以前,必須調用一次initTransactions()
方法。initTransactions()
方法的主要做用是:apache
initTransactions()
函數會等待它完成。send(ProducerRecord<K,V> record, Callback callback)
方法將在record被送入到待發送的buffer以後,當即返回。所以,可容許並行發送record,而不用阻塞等待每一個record發送完成。該方法的返回值RecordMetadata
爲該record被髮送到的分區partition的元數據
,如偏移量offset,建立時間CreateTime等。要想阻塞等待發送完成,能夠調用Future的get()
方法,如:segmentfault
producer.send(record).get();
上面基本上對KafkaProducer類的主要接口作了解釋,主要參考了官方文檔,從上面的一些方法中,能夠看到kafka的一些特性,如:發送緩衝區機制,事務性producer等,這些複雜概念將在後續文章中再作深刻探索。安全