根據 KafkaProducer 類上的註釋上來看 KafkaProducer 具備以下特徵:java
KafkaProducer 是線程安全的,能夠被多個線程交叉使用。算法
KafkaProducer 內部包含一個緩存池,存放待發送消息,即 ProducerRecord 隊列,與此同時會開啓一個IO線程將 ProducerRecord 對象發送到 Kafka 集羣。apache
KafkaProducer 的消息發送 API send 方法是異步,只負責將待發送消息 ProducerRecord 發送到緩存區中,當即返回,並返回一個結果憑證 Future。bootstrap
acks
KafkaProducer 提供了一個核心參數 acks 用來定義消息「已提交」的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值以下:api
0
表示生產者不關係該條消息在 broker 端的處理結果,只要調用 KafkaProducer 的 send 方法返回後即認爲成功,顯然這種方式是最不安全的,由於 Broker 端可能壓根都沒有收到該條消息或存儲失敗。緩存
all 或 -1
表示消息不只須要 Leader 節點已存儲該消息,而且要求其副本(準確的來講是 ISR 中的節點)所有存儲才認爲已提交,才向客戶端返回提交成功。這是最嚴格的持久化保障,固然性能也最低。安全
1
表示消息只須要寫入 Leader 節點後就能夠向客戶端返回提交成功。app
retries
kafka 在生產端提供的另一個核心屬性,用來控制消息在發送失敗後的重試次數,設置爲 0 表示不重試,重試就有可能形成消息在發送端的重複。負載均衡
batch.size
kafka 消息發送者爲每個分區維護一個未發送消息積壓緩存區,其內存大小由batch.size指定,默認爲 16K。
但若是緩存區中不足100條,但發送線程此時空閒,是須要等到緩存區中積滿100條才能發送仍是能夠當即發送呢?默認是當即發送,即 batch.size 的做用實際上是客戶端一次發送到broker的最大消息大小。異步
linger.ms
爲了提升 kafka 消息發送的高吞吐量,即控制在緩存區中未積滿 batch.size 時來控制 消息發送線程的行爲,是當即發送仍是等待必定時間,若是linger.ms 設置爲 0表示當即發送,若是設置爲大於0,則消息發送線程會等待這個值後纔會向broker發送。該參數會增長響應時間,但有利於增長吞吐量。有點相似於 TCP 領域的 Nagle 算法。
buffer.memory
用於控制消息發送者緩存的總內存大小,若是超過該值,往緩存區中添加消息會被阻塞,具體會在下文的消息發送流程中詳細介紹,阻塞的最大時間可經過參數 max.block.ms 設置,阻塞超過該值會拋出超時異常。
key.serializer
指定 key 的序列化處理器。
value.serializer
指定 消息體的序列化處理器。
enable.idempotence
從 kafka0.11版本開始,支持消息傳遞冪等,能夠作到消息只會被傳遞一次,經過 enable.idempotence 爲 true 來開啓。若是該值設置爲 true,其 retries 將設置爲 Integer.MAX_VALUE,acks 將被設置爲 all。爲了確保消息發送冪等性,必須避免應用程序端的任何重試,而且若是消息發送API若是返回錯誤,應用端應該記錄最後成功發送的消息,避免消息的重複發送。
從Kafka 0.11開始,kafka 也支持事務消息。
在 Kafka 中,生產者經過接口 Producer 定義,經過該接口的方法,咱們基本能夠得知 KafkaProducer 將具有以下基本能力:
void initTransactions()
初始化事務,若是須要使用事務方法,該方法必須首先被調用。
void beginTransaction()
開啓事務。
void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
向消費組提交當前事務中的消息偏移量,將在介紹 Kafka 事務相關文章中詳細介紹。
void commitTransaction()
提交事務。
void abortTransaction()
回滾事務。
Future< RecordMetadata> send(ProducerRecord
record)Future< RecordMetadata> send(ProducerRecord
record, Callback callback)void flush()
忽略 linger.ms 的值,直接喚醒發送線程,將緩衝區中的消息所有發送到 broker。
List< PartitionInfo> partitionsFor(String topic)
獲取 topic 的路由信息(分區信息)。
Map< MetricName, ? extends Metric> metrics()
獲取由生產者收集的統計信息。
void close()
關閉發送者。
void close(Duration timeout)
定時關閉消息發送者。
上面的方法咱們會根據須要在後續文章中進行詳細的介紹。接下來咱們看一下 KafkaProducer 的核心屬性的含義。
String clientId
客戶端ID。在建立 KafkaProducer 時可經過 client.id 定義 clientId,若是未指定,則默認 producer- seq,seq 在進程內遞增,強烈建議客戶端顯示指定 clientId。
Metrics metrics
度量的相關存儲容器,例如消息體大小、發送耗時等與監控相關的指標。
Partitioner partitioner
分區負載均衡算法,經過參數 partitioner.class 指定。
int maxRequestSize
調用 send 方法發送的最大請求大小,包括 key、消息體序列化後的消息總大小不能超過該值。經過參數 max.request.size 來設置。
long totalMemorySize
生產者緩存所佔內存的總大小,經過參數 buffer.memory 設置。
Metadata metadata
元數據信息,例如 topic 的路由信息,由 KafkaProducer 自動更新。
RecordAccumulator accumulator
消息記錄累積器,將在消息發送部分詳細介紹。
Sender sender
用於封裝消息發送的邏輯,即向 broker 發送消息的處理邏輯。
Thread ioThread
用於消息發送的後臺線程,一個獨立的線程,內部使用 Sender 來向 broker 發送消息。
CompressionType compressionType
壓縮類型,默認不啓用壓縮,可經過參數 compression.type 配置。可選值:none、gzip、snappy、lz四、zstd。
Sensor errors
錯誤信息收集器,當成一個 metrics,用來作監控的。
Time time
用於獲取系統時間或線程睡眠等。
Serializer< K> keySerializer
用於對消息的 key 進行序列化。
Serializer< V> valueSerializer
對消息體進行序列化。
ProducerConfig producerConfig
生產者的配置信息。
long maxBlockTimeMs
最大阻塞時間,當生產者使用的緩存已經達到規定值後,此時消息發送會阻塞,經過參數 max.block.ms 來設置最多等待多久。
ProducerInterceptors
interceptorsApiVersions apiVersions
維護 api 版本的相關元信息,該類只能在 kafka 內部使用。
TransactionManager transactionManager
kafka 消息事務管理器。
TransactionalRequestResult initTransactionsResult
kafka 生產者事務上下文環境初始結果。
通過上面的梳理,詳細讀者朋友對 KafkaProducer 消息生產者有了一個大概的認識,下一篇會重點介紹消息發送流程。接下來咱們以一個簡單的示例結束本文的學習。
package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 100; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
RecordMetadata recordMetadata = future.get();
System.out.printf("offset:" + recordMetadata.offset());
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
本文就介紹到這裏,其主要的目的是瞭解Kafka 的 Producer,引出後續須要學習的內容,下一篇將重點講述 Kafka 消息的發送流程,敬請關注。
若是本文對你們有所幫助的話,麻煩幫忙點個【在看】,謝謝。
https://mp.weixin.qq.com/s/rUJSctU8qdGL-7ri_9Kgkw