向Kafka寫入消息從建立一個ProducerRecord對象開始,ProducerRecord須要包含目標主題和要發送的內容,咱們還能夠指定鍵或分區,在發送ProducerRecord對象時,生產者要先把鍵和值對象序列化成字節數組。apache
接下來數據被傳給分區器,若是ProducerRecord指定了分區,那麼分區器就不會作任何事情,直接把指定的分區返回,若是沒有指定分區,那麼分區器會根據ProducerRecord的鍵來選擇一個分區,選好分區之後,生產者就知道該往哪一個主題和分區發送這條記錄了,接着記錄被添加到一個記錄批次裏,這個批次的全部內容都會被髮送到相同的主題和分區,有一個獨立的線程負責把這些記錄批次發送到相應的broker上。bootstrap
服務器在收到消息時會返回一個響應,若是消息成功寫入,就返回一個RecordMetaData對象,它包含主題和分區消息,以及記錄在分區的偏移量,若是寫入失敗,返回一個錯誤。生產者收到錯誤後回嘗試從新發送消息。數組
Kafka生產者有三個必選屬性:安全
bootstrap.servers服務器
該屬性指定broker的地址清單,地址格式host:port,建議至少提供兩個broker的信息,逗號分隔。網絡
key.serializerapp
broker但願接收到的消息的鍵和值都是字節數組,生產者接口容許使用參數化類型,所以能夠把Java對象做爲鍵和值發送給broker。這樣代碼具備良好的可讀性,不過生產者須要知道若是把這些Java對象序列化爲字節數組。異步
key.serializer必須設置爲一個實現了org.apache.kafka.common.serialization.Serializer接口的類,生產者會使用這個類把鍵值對象轉換成字節數組。Kafka客戶端默認實現了ByteArraySerializer,StringSerializer和IntegerSerializer.socket
Value.serializertcp
若是鍵值不一樣類型,就必須指定不一樣的序列號器。
發送消息主要有三種方式:
1.發送並忘記
把消息發送給服務器,並不關心他是否到達,大多數狀況下消息會到達,但也可能回丟失部分消息。
2.同步發送
使用send 方法發送消息,它會返回一個Future對象,調用get方法進行等待,就能夠知道消息是否發送成功。
3.異步發送
調用send 方法,並指定一個回調函數,服務器在返回響應時調用該函數。爲了使用回調,須要一個實現了org.apache.kafka.clients.producer.Callback接口的類。
生產者的配置
1.acks
Acks參數指定了必需要有多少分區副本收到消息,生產者纔會認爲寫入是成功的,這個參數對消息丟失的可能性有重要影響。
acks=0
生產者在成功寫入消息以前不會等待任何來自服務器的響應,也就是說,若是服務器沒有收到消息,生產者無從得知,消息就丟失了,固然生產者由於不須要等待服務器響應,因此能夠從網絡支持的最大速度發送消息,從而達到很高的吞吐量。
acks=1
只要集羣的首領節點收到消息,生產者就會獲得服務器的響應,若是消息沒法到達首領節點,生產者就會重發,若是此時首領節點崩潰,從新選舉,則消息仍是會丟失。這個時候的吞吐量取決於同步發送仍是異步發送。即使是異步發送,吞吐量仍是會收到發送中消息數量的限制,好比生產者在收到服務器響應以前能夠發送多少消息。
acks=all
只有當全部參與複製的節點所有收到消息,生產者纔會收到來自服務器的響應。這種模式最安全,不會丟失消息。固然延遲最高,速度也最慢。
2.buffer.memory
該參數用於設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。若是應用程序發送消息的速度超過了發送到服務器的速度,會致使生產者內存空間不足,這個時候send方法要被阻塞,要麼拋出異常。
3.compression.type
默認狀況下消息不被壓縮,該參數能夠設置爲snappy,gzip,lz4,使用壓縮能夠下降網絡傳輸開銷和存儲開銷,而這每每是向Kafka發送消息的瓶頸所在。
4.retries
生產者從服務器收到的錯誤有多是臨時性錯誤,這種狀況下retries參數的值決定了生產者能夠重發消息的次數,達到這個次數,若是消息依然沒有成功發送,就會被放棄重試,返回錯誤。默認狀況下,每次重試以前等待100ms(能夠經過修改retry.backoff.ms改變),建議這個時間設置得要不Kafka從崩潰中恢復的時間要長
5.batch.size
當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次。該參數指定了一個批次可使用的內存大小,按照字節數計算,不會形成延遲,只是會佔用更多內存。
6.linger.ms
該參數指定了生產者在發送批次以前等待的最大時間,Kafka會在批次填滿或者linger.ms到達上限時發送。
服務器用它識別消息的來源
8.max.in.flight.requests.per.connection
該參數指定了生產者接受到服務器響應以前能夠發送多少個消息,它的值越高,就會佔用更多內存,不過也會提高吞吐量,把它設爲1能夠保證消息按照發送的順序寫入服務器,即便發送了重試。
9.timeout.ms,request.timeout.ms和metadata.fetch.timeout.ms
request.timeout.ms指定了生產者發送數據時等待服務器返回響應的時間
metadata.fetch.timeout.ms指定了生產者在得到元數據時等待服務器返回響應的時間
timeout.ms指定了broker 等待同步副本返回消息確認的時間,與acks的配置相匹配——若是在指定時間內沒有收到同步副本的確認,那麼broker就會返回錯誤。
10.max.block.ms
該參數指定了調用send方法或者使用partitionFor獲取元數據時生產者的阻塞時間,當生產者發送緩衝區已滿,或者沒有可用的元數據時,這些方法就會阻塞,阻塞時間超過max.block.ms時就會拋出超時異常。
11.max.request.size
該參數用於空着生產者發送的請求大小,它指定發送單個消息的最大值,也能夠指定單個請求裏全部消息總的大小。
12.receive.buffer.bytes和send.buffer.bytes
這兩個參數分別指定了tcp socket接收和發送數據包的緩衝區的大小,若是設置爲-1,就使用操做系統的默認值,若是生產者或消費者與broker處於不一樣數據中心,那麼能夠適當增大這些值。