kafka生產者發送流程:java
kafka是經過異步的方式進行的消息發送流程,爲何是異步的?數組
主線程->構建ProducerRecord對象,這個對象聲明瞭主題Topic、分區Partition、鍵 Key以及 值 Value,主題和值是必需要聲明的,分區和鍵能夠不用指定。 主線程->調用send發送進行消息發送。(由於消息要在網絡上傳輸,因此必須進行序列化) 主線程->序列化器:將key和value序列化成字節數組。 主線程->分區器,根據參數進行分區選擇,參數裏有分區則指定分區,無分區有key值則經過key計算出分區,既沒有分區也沒有key值,則經過輪詢方式進行指定分區。 主線程->,將消息放入一個RecordAccumulator(消息收集器,也能夠理解爲主線程與Sender線程 直接的緩衝區)中暫存. 異步線程->Sender線程負責將消息信息構成請求,並最終執行網絡I/O的線程,它從 RecordAccumulator中取出消息並批量發送出去 Broker成功接收到消息,表示發送成功,返回消息的元數據(主題、分區、偏移量)。發送失敗能夠選擇重試或跑出異常。
須要注意的是,KafkaProducer是線程安全的,多個 線程間能夠共享使用同一個KafkaProducer對象安全
發送類型:網絡
一、發送即忘記:經過Producer對象調用send方法,發送完成後對響應結果不作任何處理。異步
二、同步發送:經過Producer對象調用send方法返回一個Future對象,而後調用Future對象的get方法等待kafka的響應,若是kafka正常響應,返回一個RecordMetadata對象,該對象爲消息的元數據對象(主題、分區、偏移量);若是kafka發生錯誤,沒法正常響應,就會拋出異常,咱們即可以進行異常處理 (try/catch包圍)。線程
三、異步發送:code
producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(metadata.partition() + ":" + metadata.offset()); } } })