在第一篇博客咱們瞭解到一個kafka系統,一般是生產者Producer 將消息發送到 Broker,而後消費者 Consumer 去 Broker 獲取,那麼本篇博客咱們來介紹什麼是生產者Producer。html
咱們知道一個系統在運行過程當中會有不少消息產生,好比前面說的對於一個購物網站,一般會記錄用戶的活動,網站的運行度量指標以及一些日誌消息等等,那麼產生這些消息的組件咱們均可以稱爲生產者。java
而對於生產者產生的消息重要程度又有不一樣,是否都很重要不容許丟失,是否容許丟失一部分?以及是否有嚴格的延遲和吞吐量要求?算法
對於這些場景在 Kafka 中會有不一樣的配置,以及不一樣的 API 使用。apache
下圖是生產者向 Kafka 發送消息的主要步驟:bootstrap
①、首先要構造一個 ProducerRecord 對象,該對象能夠聲明主題Topic、分區Partition、鍵 Key以及值 Value,主題和值是必需要聲明的,分區和鍵能夠不用指定。數組
②、調用send() 方法進行消息發送。安全
③、由於消息要到網絡上進行傳輸,因此必須進行序列化,序列化器的做用就是把消息的 key 和 value對象序列化成字節數組。服務器
④、接下來數據傳到分區器,若是之間的 ProducerRecord 對象指定了分區,那麼分區器將再也不作任何事,直接把指定的分區返回;若是沒有,那麼分區器會根據 Key 來選擇一個分區,選擇好分區以後,生產者就知道該往哪一個主題和分區發送記錄了。網絡
⑤、接着這條記錄會被添加到一個記錄批次裏面,這個批次裏全部的消息會被髮送到相同的主題和分區。會有一個獨立的線程來把這些記錄批次發送到相應的 Broker 上。app
③、Broker成功接收到消息,表示發送成功,返回消息的元數據(包括主題和分區信息以及記錄在分區裏的偏移量)。發送失敗,能夠選擇重試或者直接拋出異常。
首先在POM 文件中導入 kafka client。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
實例代碼:
1 package com.ys.utils; 2 3 import org.apache.kafka.clients.producer.*; 4 import java.util.Properties; 5 6 /** 7 * Create by YSOcean 8 */ 9 public class KafkaProducerUtils { 10 11 public static void main(String[] args) { 12 Properties kafkaProperties = new Properties(); 13 //配置broker地址信息 14 kafkaProperties.put("bootstrap.servers", "192.168.146.200:9092,192.168.146.201:9092,192.168.146.202:9092"); 15 //配置 key 的序列化器 16 kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 17 //配置 value 的序列化器 18 kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 19 20 //經過上面的配置文件生成 Producer 對象 21 Producer producer = new KafkaProducer(kafkaProperties); 22 //生成 ProducerRecord 對象,並制定 Topic,key 以及 value 23 ProducerRecord<String,String> record = 24 new ProducerRecord<String, String>("testTopic","key1","hello Producer"); 25 //發送消息 26 producer.send(record); 27 } 28 }
經過運行上述代碼,咱們向名爲 testTopic 的主題中發送了一條鍵爲 key1,值爲 hello Producer 的消息。
在上面的實例中,咱們配置了以下三個屬性:
①、bootstrap.servers:該屬性指定 brokers 的地址清單,格式爲 host:port。清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找到其它 broker 的信息。——建議至少提供兩個 broker 的信息,由於一旦其中一個宕機,生產者仍然可以鏈接到集羣上。
②、key.serializer:將 key 轉換爲字節數組的配置,必須設定爲一個實現了 org.apache.kafka.common.serialization.Serializer 接口的類,生產者會用這個類把鍵對象序列化爲字節數組。——kafka 默認提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。固然也能夠自定義序列化器。
③、value.serializer:和 key.serializer 同樣,用於 value 的序列化。
以上三個屬性是必需要配置的,下面還有一些別的屬性能夠不用配置,默認。
④、acks:此配置指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的,這個參數保障了消息發送的可靠性。默認值爲 1。
1、acks=0。生產者不會等待服務器的反饋,該消息會被馬上添加到 socket buffer 中並認爲已經發送完成。也就是說,若是發送過程當中發生了問題,致使服務器沒有接收到消息,那麼生產者也沒法知道。在這種狀況下,服務器是否收到請求是無法保證的,而且參數retries也不會生效(由於客戶端沒法得到失敗信息)。每一個記錄返回的 offset 老是被設置爲-1。好處就是因爲生產者不須要等待服務器的響應,因此它能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量。
2、acks=1。只要集羣首領收到消息,生產者就會收到一個來自服務器的成功響應。若是消息沒法到達首領節點(好比首領節點崩潰,新首領尚未被選舉出來),生產者會收到一個錯誤的響應,爲了不丟失消息,生產者會重發消息(根據配置的retires參數肯定重發次數)。不過若是一個沒有收到消息的節點成爲首領,消息仍是會丟失,這個時候的吞吐量取決於使用的是同步發送仍是異步發送。
3、acks=all。只有當集羣中參與複製的全部節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。這種模式是最安全的,可是延遲最高。
⑤、buffer.memory:該參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。默認值爲33554432 字節。若是應用程序發送消息的速度超過發送到服務器的速度,那麼會致使生產者內存不足。這個時候,send() 方法會被阻塞,若是阻塞的時間超過了max.block.ms (在kafka0.9版本以前爲block.on.buffer.full 參數)配置的時長,則會拋出一個異常。
⑥、compression.type:該參數用於配置生產者生成數據時能夠壓縮的類型,默認值爲 none(不壓縮)。還能夠指定snappy、gzip或lz4等類型,snappy 壓縮算法佔用較少的 CPU,gzip 壓縮算法佔用較多的 CPU,可是壓縮比最高,若是網絡帶寬比較有限,可使用該算法,使用壓縮能夠下降網絡傳輸開銷和存儲開銷,這每每是 kafka 發送消息的瓶頸所在。
⑦、retires:該參數用於配置當生產者發送消息到服務器失敗,服務器返回錯誤響應時,生產者能夠重發消息的次數,若是達到了這個次數,生產者會放棄重試並返回錯誤。默認狀況下,生產者會在每次重試之間等待100ms,能夠經過 retry.backoff.on 參數來改變這個時間間隔。
還有一些屬性配置,能夠參考官網:http://kafka.apachecn.org/documentation.html#producerconfigs
前面咱們介紹過,消息要到網絡上進行傳輸,必須進行序列化,而序列化器的做用就是如此。
①、默認序列化器
Kafka 提供了默認的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),還有整型(IntegerSerializer)和字節數組(BytesSerializer)序列化器,這些序列化器都實現了接口(org.apache.kafka.common.serialization.Serializer)基本上可以知足大部分場景的需求。
下面是Kafka 實現的字符串序列化器 StringSerializer:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.kafka.common.serialization; import java.io.UnsupportedEncodingException; import java.util.Map; import org.apache.kafka.common.errors.SerializationException; public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; public StringSerializer() { } public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) { encodingValue = configs.get("serializer.encoding"); } if (encodingValue instanceof String) { this.encoding = (String)encodingValue; } } public byte[] serialize(String topic, String data) { try { return data == null ? null : data.getBytes(this.encoding); } catch (UnsupportedEncodingException var4) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding); } } public void close() { } }
其中接口 serialization:
1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package org.apache.kafka.common.serialization; 7 8 import java.io.Closeable; 9 import java.util.Map; 10 11 public interface Serializer<T> extends Closeable { 12 void configure(Map<String, ?> var1, boolean var2); 13 14 byte[] serialize(String var1, T var2); 15 16 void close(); 17 }
②、自定義序列化器
若是Kafka提供的幾個默認序列化器不能知足要求,即發送到 Kafka 的消息不是簡單的字符串或整型,那麼咱們能夠自定義序列化器。
好比對於以下的實體類 Person:
1 package com.ys.utils; 2 3 /** 4 * Create by YSOcean 5 */ 6 public class Person { 7 private String name; 8 private int age; 9 10 public String getName() { 11 return name; 12 } 13 14 public void setName(String name) { 15 this.name = name; 16 } 17 18 public int getAge() { 19 return age; 20 } 21 22 public void setAge(int age) { 23 this.age = age; 24 } 25 }
咱們自定義一個 PersonSerializer:
1 package com.ys.utils; 2 3 import org.apache.kafka.common.serialization.Serializer; 4 5 import java.io.UnsupportedEncodingException; 6 import java.nio.ByteBuffer; 7 import java.util.Map; 8 9 /** 10 * Create by YSOcean 11 */ 12 public class PersonSerializer implements Serializer<Person> { 13 14 @Override 15 public void configure(Map map, boolean b) { 16 //不作任何配置 17 } 18 19 @Override 20 /** 21 * Person 對象被序列化成: 22 * 表示 age 的4 字節整數 23 * 表示 name 長度的 4 字節整數(若是爲空,則長度爲0) 24 * 表示 name 的 N 個字節 25 */ 26 public byte[] serialize(String topic, Person data) { 27 if(data == null){ 28 return null; 29 } 30 byte[] name; 31 int stringSize; 32 try { 33 if(data.getName() != null){ 34 name = data.getName().getBytes("UTF-8"); 35 stringSize = name.length; 36 }else{ 37 name = new byte[0]; 38 stringSize = 0; 39 } 40 ByteBuffer buffer = ByteBuffer.allocate(4+4+stringSize); 41 buffer.putInt(data.getAge()); 42 buffer.putInt(stringSize); 43 buffer.put(name); 44 return buffer.array(); 45 } catch (UnsupportedEncodingException e) { 46 e.printStackTrace(); 47 } 48 return new byte[0]; 49 } 50 51 @Override 52 public void close() { 53 //不須要關閉任何東西 54 } 55 }
上面例子序列化將Person類的 age 屬性序列化爲 4 個字節,後期若是該類發生更改,變爲長整型 8 個字節,那麼可能會存在新舊消息兼容性問題。
所以一般不建議自定義序列化器,可使用下面介紹的已有的序列化框架。
③、序列化框架
上面咱們知道自定義序列化器可能會存在新舊消息兼容性問題,須要咱們手動去維護,那麼爲了省去此麻煩,咱們可使用一些已有的序列化框架。好比 JSON、Avro、Thrift 或者 Protobuf。
①、普通發送——發送就忘記
//一、經過上面的配置文件生成 Producer 對象 Producer producer = new KafkaProducer(kafkaProperties); //二、生成 ProducerRecord 對象,並制定 Topic,key 以及 value //建立名爲testTopic的隊列,鍵爲testkey,值爲testValue的ProducerRecord對象 ProducerRecord<String,String> record = new ProducerRecord<>("testTopic","testkey","testValue"); //三、發送消息 producer.send(record);
經過配置文件構造一個生產者對象 producer,而後指定主題名稱,鍵值對,構造一個 ProducerRecord 對象,最後使用生產者Producer 的 send() 方法發送 ProducerRecord 對象,send() 方法會返回一個包含 RecordMetadata 的 Future 對象,不過一般咱們會忽略返回值。
和上面的名字同樣——發送就忘記,生產者只管發送,並無論發送的結果是成功或失敗。一般若是咱們不關心發送結果,那麼就可使用此種方式。
②、同步發送
//一、經過上面的配置文件生成 Producer 對象 Producer producer = new KafkaProducer(kafkaProperties); //二、生成 ProducerRecord 對象,並制定 Topic,key 以及 value //建立名爲testTopic的隊列,鍵爲testkey,值爲testValue的ProducerRecord對象 ProducerRecord<String,String> record = new ProducerRecord<>("testTopic","testkey","testValue"); //三、同步發送消息 try { //經過send()發送完消息後返回一個Future對象,而後調用Future對象的get()方法等待kafka響應 //若是kafka正常響應,返回一個RecordMetadata對象,該對象存儲消息的偏移量 //若是kafka發生錯誤,沒法正常響應,就會拋出異常,咱們即可以進行異常處理 producer.send(record).get(); } catch (Exception e) { //四、異常處理 e.printStackTrace(); }
和上面普通發送消息同樣,只不過這裏咱們調用了 Future 對象的 get() 方法來等待 kafka 服務器的響應,程序運行到這裏會產生阻塞,直到獲取kafka集羣的響應。而這個響應有兩種狀況:
一、正常響應:返回一個 RecordMetadata 對象,經過該對象咱們可以獲取消息的偏移量、分區等信息。
二、異常響應:基本上來講會發生兩種異常,
一類是可重試異常,該錯誤能夠經過重發消息來解決。好比鏈接錯誤,能夠經過再次鏈接後繼續發送上一條未發送的消息;再好比集羣沒有首領(no leader),由於咱們知道集羣首領宕機以後,會有一個時間來進行首領的選舉,若是這時候發送消息,確定是沒法發送的。
二類是沒法重試異常,好比消息太大異常,對於這類異常,KafkaProducer 不會進行任何重試,直接拋出異常。
同步發送消息適合須要保證每條消息的發送結果,優勢是可以精確的知道什麼消息發送成功,什麼消息發送失敗,而對於失敗的消息咱們也能夠採起措施進行從新發送。缺點則是增長了每條消息發送的時間,當發送消息頻率很高時,此種方式便不適合了。
③、異步發送
有同步發送,基本上就會有異步發送了。同步發送每發送一條消息都得等待kafka服務器的響應,以後才能發送下一條消息,那麼咱們不是在錯誤產生時立刻處理,而是記錄異常日誌,而後立刻發送下一條消息,而這個異常再經過回調函數去處理,這就是異步發送。
一、首先咱們要實現一個繼承 org.apache.kafka.clients.producer.Callback 接口,而後實現其惟一的 onCompletion 方法。
package com.ys.utils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; /** * Create by YSOcean */ public class KafkaCallback implements Callback{ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ //異常處理 e.printStackTrace(); } } }
二、發送消息時,傳入這個回調類。
//異步發送消息 producer.send(record,new KafkaCallback());