對應代碼倉庫地址:https://github.com/T0UGH/getting-started-kafkajava
參考資料:git
kafka權威指南https://book.douban.com/subject/27665114/github
大數據通用的序列化器——Apache Avrohttps://www.jianshu.com/p/0a85bfbb9f5f算法
Kafka 中使用 Avro 序列化框架(一)https://www.jianshu.com/p/4f724c7c497dshell
Kafka 中使用 Avro 序列化組件(三):Confluent Schema Registryhttps://www.jianshu.com/p/cd6f413d35b0apache
本章對應代碼倉庫可查看https://github.com/T0UGH/getting-started-kafka/tree/main/src/main/java/cn/edu/neu/demo/ch3編程
在這一章,咱們將從Kafka生產者的設計和組件講起,學習如何使用Kafka 生產者。json
先展現向Kafka發送消息的主要步驟bootstrap
首先建立一個ProducerRecord對象開始,ProducerRecord 對象須要包含目標主題和要發送的內容,有可能還包含鍵和分區信息數組
把ProducerRecord中的鍵和值序列化成字節數組,這樣它們纔可以在網絡上傳輸
接下來,數據被傳給分區器。
緊接着,這條記錄被添加到一個記錄批次裏,這個批次裏的全部消息會被髮送到相同的主題和分區上。
有一個獨立的線程負責把這些記錄批次發送到相應的broker 上。
服務器在收到這些消息時會返回一個響應。
若是消息成功寫入Kafka,就返回一個RecordMetaData對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量。
若是寫入失敗,則會返回一個錯誤。生產者在收到錯誤以後會嘗試從新發送消息,幾回以後若是仍是失敗,就返回錯誤信息。
下面展現如何建立一個KafkaProducer
Properties kafkaProperties = new Properties(); // fixme: 運行時請修改47.94.139.116:9092爲本身的kafka broker地址 kafkaProperties.put("bootstrap.servers", "47.94.139.116:9092"); kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProperties.put("acks", "all"); // 根據配置建立Kafka生產者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(kafkaProperties);
在建立Kafka生產者以前,有3個必選的配置項
配置項 | 解釋 |
---|---|
bootstrap.servers | 該屬性指定broker 的地址清單,地址的格式爲host:port, host:port。 |
key.serializer | key.serializer 必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵序列化成字節數組。Kafka 客戶端默認提供了ByteArraySerializer、StringSerializer 和IntegerSerializer |
value.serializer | 與key.serializer 同樣,value.serializer 指定的類會將值序列化。 |
實例化生產者對象後,接下來就能夠開始發送消息了。發送消息主要有如下3 種方式:
下面分別演示這三種方式
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( "sun", "s1", "cn dota best dota"); // 發送消息 try{ kafkaProducer.send(producerRecord); }catch(Exception e){ e.printStackTrace(); }
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( "sun", "s1", "cn dota best dota"); // 發送消息 try{ kafkaProducer.send(producerRecord).get(); }catch(Exception e){ e.printStackTrace(); }
producer.send()方法先返回一個Future對象,而後調用Future 對象的get()方法會阻塞當前線程來等待Kafka響應。
若是在發送消息以前或者在發送消息的過程當中發生了任何錯誤,好比broker 返回了一個不容許重發消息的異常或者已經超過了重發的次數,那麼就會拋出異常。
KafkaProducer通常會發生兩類錯誤。
若是隻發送消息而不等待響應,那麼能夠避免阻塞線程來等待,從而提升發送效率。
大多數時候,咱們並不須要等待響應——儘管Kafka會把目標主題、分區信息和消息的偏移量發送回來,但對於發送端的應用程序來講不是必需的。不過在遇到消息發送失敗時,咱們須要拋出異常、記錄錯誤日誌,或者把消息寫入「錯誤消息」文件以便往後分析。
這時咱們能夠爲send()方法註冊一個回調函數,讓它來處理異步調用的返回結果
// 建立ProducerRecord,它是一種消息的數據結構 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("sun", "s1", "cn dota best dota"); // 發送消息 kafkaProducer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ e.printStackTrace(); }else{ System.out.println(recordMetadata); } } });
onCompletion
方法的Exception
參數不爲null,咱們能夠針對這個異常進行處理RecordMetadata
不爲null,咱們能夠從中獲取主題信息、分區信息、偏移量信息生產者還有不少可配置的參數,在Kafka 文檔裏都有說明,它們大部分都有合理的默認值,因此沒有必要去修改它們。不過有幾個參數在內存使用、性能和可靠性方面對生產者影響比較大,接下來咱們會一一說明。
acks參數指定了必須要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。
若是acks=0,生產者發送消息以後就馬上認爲消息寫入成功。
若是acks=1,只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功響應。
若是acks=all,只有當首領節點和全部複製節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。
retries 參數的值決定了生產者能夠重發消息的次數,若是達到這個次數,生產者會放棄重試並返回錯誤。默認狀況下,生產者會在每次重試之間等待100ms。
由於生產者會自動進行重試,因此就不必在代碼邏輯裏處理那些可重試的錯誤。你只須要處理那些不可重試的錯誤或重試次數超出上限的狀況。
KafkaProducer會在批次填滿或linger.ms達到上限時把批次發送出去。
該參數指定了一個批次可使用的內存大小,按照字節數計算(而不是消息個數)。當批次被填滿,批次裏的全部消息會被髮送出去。
該參數指定了生產者在發送批次以前等待更多消息加入批次的時間
該參數指定了生產者在收到服務器響應以前能夠發送多少個消息。
咱們已經在以前的例子裏看到,建立一個生產者對象必須指定序列化器。咱們已經知道如何使用默認的字符串序列化器,Kafka 還提供了整型和字節數組序列化器,不過它們還不足以知足大部分場景的需求。到最後,咱們須要序列化的記錄類型會愈來愈多。
接下來演示如何開發自定義序列化器,並介紹Avro序列化器。若是發送到Kafka的對象 不是簡單的字符串或整型,那麼可使用序列化框架來建立消息記錄,如Avro、Thrift 或Protobuf,或者使用自定義序列化器。咱們強烈建議使用通用的序列化框架。
/** * 一個簡單的pojo,爲了演示如何自定義序列化器 * */ public class Customer { private int customerId; private String customerName; public Customer(int customerId, String customerName) { this.customerId = customerId; this.customerName = customerName; } public Customer() { } public int getCustomerId() { return customerId; } public void setCustomerId(int customerId) { this.customerId = customerId; } public String getCustomerName() { return customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } }
/** * 自定義序列化器 * */ public class CustomerSerializer implements Serializer<Customer> { public void configure(Map<String, ?> configs, boolean isKey) { // 不須要配置任何 } /** * Customer對象的序列化函數,組成以下 * 前4字節: customerId * 中間4字節: customerName字節數組長度 * 後面n字節: customerName字節數組 * */ public byte[] serialize(String topic, Customer data) { try{ byte[] serializedName; int stringSize; if(data == null){ return null; }else{ if(data.getCustomerName() != null){ serializedName = data.getCustomerName().getBytes("utf-8"); stringSize = serializedName.length; } else{ serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getCustomerId()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch(Exception e){ throw new SerializationException( "Error when serializing Customer to byte[] " + e); } } public byte[] serialize(String topic, Headers headers, Customer data) { return serialize(topic, data); } public void close() { // 不須要關閉任何 } }
不過咱們不建議採用自定義序列化器,理由以下
Apache Avro是一種與編程語言無關的序列化格式。
Avro數據經過與語言無關的schema來定義。
Avro 有一個頗有意思的特性是,當負責寫消息的應用程序使用了新版本的schema,負責讀消息的應用程序能夠繼續處理消息而無需作任何改動,這個特性使得它特別適合用在像Kafka 這樣的消息系統上。
下面舉個例子
假設咱們有一個v0.0.1的schema
{ "namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "faxNumber", "type": ["null", "string"], "default": "null"} ] }過了一段時間,咱們須要刪除
faxnumber
字段(傳真號碼),添加一個{ "namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "email", "type": ["null", "string"], "default": "null"} ] }更新到新版的schema 後:
- 舊記錄仍然包含faxNumber 字段
- 而新記錄則包含email 字段
- 部分負責讀取數據的應用程序進行了升級,那麼它們是如何處理這些變化的呢?
在消費者升級以前:
- 它們會調用相似getName()、getId() 和getFaxNumber() 這樣的方法。
- 若是碰到使用新schema 構建的消息,getName() 和getId() 方法仍然可以正常返回,但getFaxNumber() 方法會返回null,由於消息裏不包含傳真號碼。
在消費者升級以後
- getEmail() 方法取代了getFaxNumber() 方法。
- 若是碰到一個使用舊schema 構建的消息,那麼getEmail() 方法會返回null,由於舊消息不包含郵件地址。
如今能夠看出使用Avro 的好處了:咱們修改了消息的schema,但並不須要更新全部消費者,而這樣仍然不會出現異常或阻斷性錯誤,也不須要對現有數據進行大幅更新。
若是在每條Kafka 記錄裏都嵌入schema,會讓記錄的大小成倍地增長。
在以前的例子裏,ProducerRecord對象包含了目標主題、鍵和值。Kafka 的消息是一個個鍵值對,ProducerRecord 對象能夠只包含目標主題和值,鍵能夠設置爲默認的null,不過大多數應用程序會用到鍵。
鍵有兩個用途:
若是要建立鍵爲null的消息,不指定鍵就能夠
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");
若是鍵爲null,而且使用了默認的分區器,那麼記錄將被隨機地發送到主題內各個可用的分區上。分區器使用輪詢(Round Robin)算法將消息均衡地分佈到各個分區上。
若是鍵不爲空,而且使用了默認的分區器,那麼Kafka會使用Kafka本身的散列算法對鍵進行散列(使用Kafka 本身的散列算法,即便升級Java 版本,散列值也不會發生變化),而後根據散列值把消息映射到特定的分區上。這裏的關鍵之處在於,同一個鍵老是被映射到同一個分區上。
只有在不改變主題分區數量的狀況下,鍵與分區之間的映射才能保持不變。舉個例子,在分區數量保持不變的狀況下,能夠保證用戶045189 的記錄老是被寫到分區34。在從分區讀取數據時,能夠進行各類優化。不過,一旦主題增長了新的分區,這些就沒法保證了——舊數據仍然留在分區34,但新的記錄可能被寫到其餘分區上。若是要使用鍵來映射分區,那麼最好在建立主題的時候就把分區規劃好,並且永遠不要增長新分區。
默認狀況下,kafka自動建立的主題的分區數量爲1,因此咱們須要先修改分區數量,來讓自定義分區器有點用。
先cd到opt\kafka\bin\
運行命令:
./kafka-topics.sh --zookeeper 47.94.139.116:2181/kafka --alter --topic sun --partitions 4
查看是否修改爲功
./kafka-topics.sh --describe --zookeeper 47.94.139.116:2181/kafka --topic sun
或者也能夠經過修改server.properties
中的nums.partions
並重啓,來更改默認分區數量。
下面自定義一個分區器,它根據鍵來劃分分區:若鍵爲Banana則放入最後一個分區,若鍵不爲Banana則散列到其餘分區。
/** * 自定義分區器 * 若鍵爲Banana則放入最後一個分區,若鍵不爲Banana則散列到其餘分區 * */ public class BananaPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 分區信息列表 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //分區數量 int partitionsAmount = partitions.size(); // 若是隻有一個分區,那全都放到partition0就完事了 if(partitionsAmount == 1){ return 0; } if(keyBytes == null || ! (key instanceof String)){ throw new InvalidRecordException("We expect all messages to have customer name as key"); } if(key.equals("Banana")){ return partitionsAmount - 1; } return (Math.abs(Utils.murmur2(keyBytes)) % (partitionsAmount - 1)); } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
而後,在producer配置中加入kafkaProperties.put("partitioner.class", "cn.edu.neu.demo.ch3.partitioner.BananaPartitioner");
便可。