在上文中介紹了AdminClient API的使用,如今咱們已經知道如何在應用中經過API去管理Kafka了。但在大多應用開發中,咱們最常面臨的場景就是發送消息到Kafka,或者從Kafka中消費消息,也就是典型的生產/消費模式。而本文將要演示的就是如何使用Producer API將消息發送至Kafka中,使應用成爲一個生產者。java
Producer API具備如下幾種發送模式:算法
接下來,使用一個簡單的例子演示一下異步向Kafka發送消息。首先,咱們須要建立一個Producer
實例,而且必須配置三個參數,分別是Kafka服務的ip地址及端口號,以及消息key和value的序列化器(消息體以key-value結構形式存在)。apache
在本例中,消息的key和value均爲String
類型,因此使用StringSerializer
這個字符串類型的序列化器。代碼示例:安全
/** * 建立Producer實例 */ public static Producer<String, String> createProducer() { Properties properties = new Properties(); // 指定Kafka服務的ip地址及端口號 properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 指定消息key的序列化器 properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 指定消息value的序列化器 properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<>(properties); }
在new KafkaProducer
時,構造器裏作了什麼:bash
Properties
裏的配置項,初始化ProducerConfig
ProducerConfig
初始化一些配置字段MetricConfig
監控度量指標配置以及MetricsReporter
報告器列表和Metrics
存儲庫partitioner
負載均衡器,當有多個partition時就是經過這個負載均衡器去將消息均勻的分發到不一樣的partition中RecordAccumulator
,一個相似於計數器的東西,用於計算消息批次的。由於Producer
並非接收到一條消息就發送到一條消息,而是達到必定批量後按批次發送的,因此須要有一個計數器來存儲和計算批次。Sender
,而後會爲其建立一個守護線程,並啓動Tips:app
KafkaProducer
構造器的源碼,就會發現其全部的屬性都是final
的,而且均在構造器中完成了初始化,不存在不安全的發佈或共享變量,這也就變相說明了KafkaProducer
是線程安全的而後調用Producer
中的send
方法便可實現異步發送。代碼示例:負載均衡
/** * 演示Producer異步發送 */ public static void producerAsyncSend() { String topicName = "MyTopic"; String key = "test-key"; String value = "this is test message!"; try (Producer<String, String> producer = createProducer()) { // 構建消息對象 ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); // 發送一條消息 producer.send(record); } }
在producer.send(record)
裏主要作了如下事情:異步
accumulator.append
向批次中追加消息sender.wakeup
在守護線程中去發送消息大體時序圖以下:
ide
發送消息的具體流程圖以下:
函數
send
方法會有一個Future
類型的返回值,當咱們調用Future
的get
方法時,就會阻塞當前線程,此時就達到了異步阻塞發送消息的效果,即發送消息是異步的,獲取結果是阻塞的。咱們能夠經過這種方式去獲取Future
裏存儲的元數據信息。代碼示例:
/** * 演示Producer異步阻塞式發送 */ public static void producerAsyncBlockSend() throws Exception { String topicName = "MyTopic"; String key = "test-key"; String value = "this is test message!"; try (Producer<String, String> producer = createProducer()) { // 構建消息對象 ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); // 發送一條消息 Future<RecordMetadata> future = producer.send(record); // 調用get時會阻塞當前線程,就能實現異步阻塞式地發送 // 其實發送完就立刻get已經同等於同步的效果了 RecordMetadata metadata = future.get(); System.out.println(String.format( "hasTimestamp: %s, timestamp: %s, hasOffset: %s, offset: %s, partition: %s, topic: %s", metadata.hasTimestamp(), metadata.timestamp(), metadata.hasOffset(), metadata.offset(), metadata.partition(), metadata.topic() )); } }
運行以上代碼,控制檯輸出內容以下:
hasTimestamp: true, timestamp: 1589637627231, hasOffset: true, offset: 5, partition: 1, topic: MyTopic
若是想要在發送完消息後獲取結果,比起直接調用Future
的get
方法更好的方式是使用異步回調的消息發送形式。
在send
方法中支持傳入一個回調函數,當消息發送完畢後,會調用回調函數並將結果看成參數傳入,此時咱們就能夠在回調函數中對結果進行處理。代碼示例:
/** * 演示Producer異步回調發送 */ public static void producerAsyncCallbackSend() throws Exception { String topicName = "MyTopic"; String key = "test-key"; String value = "this is test message!"; try (Producer<String, String> producer = createProducer()) { // 構建消息對象 ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value); // 發送一條消息,傳入一個回調函數,當消息發送完成後會調用傳入的回調函數 producer.send(record, (metadata, err) -> { if (err != null) { err.printStackTrace(); } System.out.println(String.format( "hasTimestamp: %s, timestamp: %s, hasOffset: %s, offset: %s, partition: %s, topic: %s", metadata.hasTimestamp(), metadata.timestamp(), metadata.hasOffset(), metadata.offset(), metadata.partition(), metadata.topic() )); }); } }
運行以上代碼,控制檯輸出內容以下:
hasTimestamp: true, timestamp: 1589639553024, hasOffset: true, offset: 7, partition: 1, topic: MyTopic
在某些特殊的業務場景下咱們常常會有自定義負載均衡算法的需求,在Kafka中能夠經過實現Partitioner
接口來自定義Partition負載均衡器。
本例中所實現的負載均衡算法比較簡單,就是使用key
的hashcode
去對partition
的數量進行取餘得出partition
的索引,代碼示例:
package com.zj.study.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 自定義Partition負載均衡器 * * @author 01 * @date 2020-05-17 **/ public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partitionsNum = cluster.partitionsForTopic(topic).size(); return key.hashCode() % partitionsNum; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
而後在建立Producer
實例時,指定MyPartitioner
的包名路徑便可。代碼示例:
/** * 建立Producer實例 */ public static Producer<String, String> createProducer() { Properties properties = new Properties(); ... // 指定自定義的Partition負載均衡器 properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG , "com.zj.study.kafka.producer.MyPartitioner"); return new KafkaProducer<>(properties); }
咱們首先要了解一下消息的傳遞語義,通常存在三種類型語義:
在Kafka中主要經過消息重發和ACK機制來保障消息的傳遞,消息重發機制主要是提升消息發送的成功率,並不能保證消息必定能發送成功。咱們能夠經過在建立Producer
實例時,設置retries
配置項來開啓或關閉消息重發機制,代碼示例:
// 設置的值爲0表示關閉,大於0則表示開啓 properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
另外一個消息傳遞保障機制就是ACK機制,Kafka中的ACK機制有三種模式,須要經過配置去指定。這三種配置的含義以下:
一樣的該配置項能夠在建立Producer
實例時進行設置,代碼示例:
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
上面的三種取值能夠根據實際的業務場景來進行設置,消息的可靠性越強的,性能確定就會越差。這三種取值就是在消息的可靠性以及性能兩個方面作一個權衡:
acks=0
acks=1
acks=all