不論將kafka做爲何樣的用途,都少不了的向Broker發送數據或接受數據,Producer就是用於向Kafka發送數據。以下:java
pom.xml文件以下:node
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
建立生產者的時候,咱們須要爲生產者設置一些屬性,其中有三個必選屬性以下:算法
1. bootstrap.servers: 該屬性指定broker 的地址清單,地址的格式爲host:po 忱。清單裏不須要包含全部的broker 地址,生產者會給定的broker 裏查找到其餘broker 的信息。不過建議至少要提供兩個broker 的信息, 一且其中一個若機,生產者仍然可以鏈接到集羣上。apache
2. key.serializer: broker 但願接收到的消息的鍵和值都是字節數組。生產者接口容許使用參數化類型,所以能夠把Java 對象做爲鍵和值發送給broker 。這樣的代碼具備良好的可讀性,不過生產者須要知道如何把這些Java 對象轉換成字節數組。key. serializer必須被設置爲一個實現了org.apache.kafka.common.serialization.StringSerializer接口的類,生產者會使用這個類把鍵對象序列化成字節數組。Kafka 客戶端默認提供了ByteArraySerializer(這個只作不多的事情)、StringSerializer和IntegeSerializer,所以,若是你只使用常見的幾種Java 對象類型,那麼就不必實現本身的序列化器。要注意, key.serializer是必須設置的,就算你打算只發送值內容。bootstrap
3. value.serializer: 與key.serializer同樣,value.serializer指定的類會將值序列化。若是鍵和值都是字符串,可使用與key.serializer同樣的序列化器。若是鍵是整數類型而值是字符串,那麼須要使用不一樣的序列化器。數組
設置屬性代碼以下:安全
Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value容許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
1. 併發並忘記,這是普通的消息發送方式,咱們把消息發送給服務器,但井不關心它是否正常到達。大多數狀況下,消息會正常到達,由於Kafka 是高可用的,並且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些消息。服務器
實現以下:網絡
package com.wangx.kafka.client; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value容許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie); ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world"); kafkaProducer.send(record); } }
此時在Kafka中打開內置的消費者消費消息,結果以下,命令以下:併發
kafka-console-consumer.sh --bootstrap-server 47.105.145.123:9092 --topic testTopic --from-beginning
而後,啓動生產者發送消息,結果以下:
這裏啓動了四次消費者,因此有四條消息被消費。
咱們使用send () 方怯發送消息, 它會返回Future對象,調用get () 方法進行等待,就能夠知道悄息是否發送成功。
實現方式以下:
package com.wangx.kafka.client; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value容許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie); //建立消息對象,第一個爲參數topic,第二個參數爲key,第三個參數爲value ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world"); //同步發送方式,get方法返回結果 RecordMetadata metadata = (RecordMetadata) kafkaProducer.send(record).get(); System.out.println("broker返回消息發送信息" + metadata); } }
客戶端消費者仍能收到消息,且生產者也能收到返回結果,返回結果以下:
假設消息在應用程序和Kafka 集羣之間一個來回須要lOm s 。若是在發送完每一個消息後都等待迴應,那麼發送100 個消息須要l秒。但若是隻發送消息而不等待響應,那麼發送100 個消息所須要的時間會少不少。大多數時候,咱們並不須要等待響應一一儘管Kafka會把目標主題、分區信息和悄息的偏移量發送回來,但對於發送端的應用程序來講不是必需的。不過在遇到消息發送失敗時,咱們須要拋出異常、記錄錯誤日誌,或者把消息寫入「錯誤消息」文件以便往後分析。
實現以下:
package com.wangx.kafka.client; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaProducerDemo { public static void main(String[] args) { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094"); //配置key-value容許使用參數化類型 kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie); //建立消息對象,第一個爲參數topic,第二個參數爲key,第三個參數爲value final ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world"); //異步發送消息。異常時打印異常信息或發送結果 kafkaProducer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println(e.getMessage()); } else { System.out.println("接收到返回結果:" + recordMetadata); } } }); //異步發送消息時必需要flush,不然發送不成功,不會執行回調函數 kafkaProducer.flush(); } }
監聽到的返回信息以下:
生產者還有不少能夠配置的參數,以下:
1. acks:指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。這個參數對消息丟失的可能性有重要影響。該參數有以下選項。
若是acks=0 , 生產者在成功寫入消息以前不會等待任何來自服務器的響應。也就是說,若是當中出現了問題, 致使服務器沒有收到消息,那麼生產者就無從得知,消息也就丟失了。不過,由於生產者不須要等待服務器的響應,因此它能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量。
若是acks=1 ,只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功響應。若是消息沒法到達首領節點(好比領導節點奔潰,新的首領尚未被選舉出來),生產者會收到一個錯誤響應,爲了不數據丟失,生產者會重發消息。不過,若是一個沒有收到消息的節點成爲新首領,消息仍是會丟失。這個時候的吞吐量取決於使用的是同步發送仍是異步發送。若是讓發送客戶端等待服務器的響應(經過調用Future對象的ge t ()方法),顯然會增長延遲(在網絡上傳輸一個來回的延遲)。若是客戶端使用回調,延遲問題就能夠獲得緩解,不過吞吐量仍是會受發送中消息數量的限制(好比,生產者在收到服務器響應以前能夠發送多少個消息)。
若是acks=all ,只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。這種模式是最安全的,它能夠保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集羣仍然能夠運行。不過,它的延遲比acks=1時更高,由於咱們要等待不僅一個服務器節點接收消息。
2. buffer.memory: 用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。若是應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足。這個時候,send ()方法調用要麼被阻塞,要麼拋出異常,取決於如何設置block.on.buffer 參數(在0. 9.0.0 版本里被替換成了l'la x .block.l'ls ,表示在拋出異常以前能夠阻塞一段時間)。
3. compression.type: 默認狀況下,消息發送時不會被壓縮。該參數能夠設置爲snappy 、gzip 或lz4 ,它指定了消息被髮到broker 以前使用哪種壓縮算法進行壓縮。ssnappy壓縮算法由Google發明,它佔用較少的CPU ,卻能提供較好的性能和至關可觀的 壓縮比,若是比較關注性能和網絡帶寬,可使用這種算法。gzip壓縮算法通常會佔用較多的CPU ,但會提供更高的壓縮比,因此若是網絡帶寬比較有限,可使用這種算法。使用壓縮能夠下降網絡傳輸開銷和存儲開銷,而這每每是向Kafka 發送消息的瓶頸所在。
4. retries: 生產者從服務器收到的錯誤有多是臨時性的錯誤,在這種狀況下, retries 參數的值決定了生產者能夠重發消息次數,若是達到這個次數,生產者會放棄重試並返回錯誤
5. batch.size: 當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次裏,該參數指定了一個批次可使用的內存大小,按照字節數計算。
6. linger.ms: 該參數指定了生產者在發送批次以前等待更多消息加入批次的時間。
7. client.id: 該參數能夠是任意的字符串,服務器會用它來識別消息的來源,還能夠用在日誌和配額指標裏
8. max.in.flight.requests.per.connection: 該參數指定了生產者在收到服務器晌應以前能夠發送多少個消息
9. timeout.ms 、request.timeout.ms 和metadata.fetch.timeout.ms:request.timeout.ms 指定了生產者在發送數據時等待服務器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元數據(好比目標分區的首領是誰)時等待服務器返回響應的時間。若是等待響應超時,那麼生產者要麼重試發送數據,要麼返回一個錯誤(拋出異常或執行回調)。timeout.ms指定了broker 等待同步副本返回消息確認的時間,與asks 的配置相匹配一一若是在指定時間內沒有收到同步副本的確認,那麼broker 就會返回一個錯誤。
10. max.block.ms:該參數指定了在調用send () 方法或使用partitionsFor()方法獲取元數據時生產者的阻塞時間。
11. max.request.size:該參數用於控制生產者發送的請求大小,能夠指能發送的單個消息的最大值,也能夠指單個請求裏面全部消息總的大小。
12. receive.buffer.bytes 和send.buffer.bytes: 這兩個參數分別指定了TCP socket 接收和發送數據包的緩衝區大小,若是它們被設爲-1,就使用操做系統的默認值。