(1)不一樣的應用場景對消息有不一樣的需求,便是否容許消息丟失、重複、延遲以及吞吐量的要求。不一樣場景對Kafka生產者的API使用和配置會有直接的影響。算法
例子1:信用卡事務處理系統,不容許消息的重複和丟失,延遲最大500ms,對吞吐量要求較高。apache
例子2:保存網站的點擊信息,容許少許的消息丟失和重複,延遲能夠稍高(用戶點擊連接能夠立刻加載出頁面便可),吞吐量取決於用戶使用網站的頻度。bootstrap
(2)Kafka發送消息的主要步驟數組
消息格式:每一個消息是一個ProducerRecord對象,必須指定消息所屬的Topic和消息值Value,此外還能夠指定消息所屬的Partition以及消息的Key。緩存
1:序列化ProducerRecord服務器
2:若是ProducerRecord中指定了Partition,則Partitioner不作任何事情;不然,Partitioner根據消息的key獲得一個Partition。這是生產者就知道向哪一個Topic下的哪一個Partition發送這條消息。網絡
3:消息被添加到相應的batch中,獨立的線程將這些batch發送到Broker上併發
4:broker收到消息會返回一個響應。若是消息成功寫入Kafka,則返回RecordMetaData對象,該對象包含了Topic信息、Patition信息、消息在Partition中的Offset信息;若失敗,返回一個錯誤app
(3)Kafka的順序保證。Kafka保證同一個partition中的消息是有序的,即若是生產者按照必定的順序發送消息,broker就會按照這個順序把他們寫入partition,消費者也會按照相同的順序讀取他們。框架
例子:向帳戶中先存100再取出來 和 先取100再存進去是徹底不一樣的,所以這樣的場景對順序很敏感。
若是某些場景要求消息是有序的,那麼不建議把retries設置成0,。能夠把max.in.flight.requests.per.connection設置成1,會嚴重影響生產者的吞吐量,可是能夠保證嚴格有序。
要往Kafka中寫入消息,須要先建立一個Producer,並設置一些屬性。
Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "broker1:port1, broker2:port2"); kafkaProps.put("key.serializer", "org.apache.kafka.common.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps);
Kafka的生產者有以下三個必選的屬性:
(1)bootstrap.servers,指定broker的地址清單
(2)key.serializer必須是一個實現org.apache.kafka.common.serialization.Serializer接口的類,將key序列化成字節數組。注意:key.serializer必須被設置,即便消息中沒有指定key。
(3)value.serializer,將value序列化成字節數組
(1)同步發送消息
ProducerRecord<String, String> record = new ProducerRecord<>("CustomCountry", "Precision Products", "France");//Topic Key Value try{ Future future = producer.send(record); future.get();//不關心是否發送成功,則不須要這行。 } catch(Exception e) { e.printStackTrace();//鏈接錯誤、No Leader錯誤均可以經過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接拋出異常 }
(2)異步發送消息
ProducerRecord<String, String> record = new ProducerRecord<>("CustomCountry", "Precision Products", "France");//Topic Key Value producer.send(record, new DemoProducerCallback());//發送消息時,傳遞一個回調對象,該回調對象必須實現org.apahce.kafka.clients.producer.Callback接口 private class DemoProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) {//若是Kafka返回一個錯誤,onCompletion方法拋出一個non null異常。 e.printStackTrace();//對異常進行一些處理,這裏只是簡單打印出來 } } }
(1)acks指定必需要有多少個partition副本收到消息,生產者纔會認爲消息的寫入是成功的。
acks=0,生產者不須要等待服務器的響應,以網絡能支持的最大速度發送消息,吞吐量高,可是若是broker沒有收到消息,生產者是不知道的
acks=1,leader partition收到消息,生產者就會收到一個來自服務器的成功響應
acks=all,全部的partition都收到消息,生產者纔會收到一個服務器的成功響應
(2)buffer.memory,設置生產者內緩存區域的大小,生產者用它緩衝要發送到服務器的消息。
(3)compression.type,默認狀況下,消息發送時不會被壓縮,該參數能夠設置成snappy、gzip或lz4對發送給broker的消息進行壓縮
(4)retries,生產者從服務器收到臨時性錯誤時,生產者重發消息的次數
(5)batch.size,發送到同一個partition的消息會被先存儲在batch中,該參數指定一個batch可使用的內存大小,單位是byte。不必定須要等到batch被填滿才能發送
(6)linger.ms,生產者在發送消息前等待linger.ms,從而等待更多的消息加入到batch中。若是batch被填滿或者linger.ms達到上限,就把batch中的消息發送出去
(7)max.in.flight.requests.per.connection,生產者在收到服務器響應以前能夠發送的消息個數
在建立ProducerRecord時,必須指定序列化器,推薦使用序列化框架Avro、Thrift、ProtoBuf等,不推薦本身建立序列化器。
在使用 Avro 以前,須要先定義模式(schema),模式一般使用 JSON 來編寫。
(1)建立一個類表明客戶,做爲消息的value
class Custom { private int customID; private String customerName; public Custom(int customID, String customerName) { super(); this.customID = customID; this.customerName = customerName; } public int getCustomID() { return customID; } public String getCustomerName() { return customerName; } }
(2)定義schema
{ "namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields":[ { "name": "id", "type": "string" }, { "name": "name", "type": "string" }, ] }
(3)生成Avro對象發送到Kafka
Properties props = new Properties(); props.put("bootstrap", "loacalhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", schemaUrl);//schema.registry.url指向射麻的存儲位置 String topic = "CustomerContacts"; Producer<String, Customer> produer = new KafkaProducer<String, Customer>(props); //不斷生成消息併發送 while (true) { Customer customer = CustomerGenerator.getNext(); ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer); producer.send(record);//將customer做爲消息的值發送出去,KafkaAvroSerializer會處理剩下的事情 }
ProducerRecord能夠只包含Topic和消息的value,key默認是null,可是大多數應用程序會用到key,key的兩個做用:
(1)做爲消息的附加信息
(2)決定消息該被寫到Topic的哪一個partition,擁有相同key的消息會被寫到同一個partition。
若是key爲空,kafka使用默認的partitioner,使用RoundRobin算法將消息均衡地分佈在各個partition上;
若是key不爲空,kafka使用本身實現的hash方法對key進行散列,相同的key被映射到相同的partition中。只有在不改變partition數量的前提下,key和partition的映射才能保持不變。
kafka也支持用戶實現本身的partitioner,用戶本身定義的paritioner須要實現Partitioner接口。
參考:《Kafka權威指南》