kafka學習筆記-生產者(二)
三、發送消息到Kafka
- 最簡單的方式
- 一、生產者的send()方法將ProducerRecord對象做爲參數,因此藥建立一個ProducerRecord對象。
- 二、咱們使用生產者的send()方法發送ProducerRecord對象。
- 從生產者的架構圖裏面能夠看到,消息先試被放進緩衝區,而後使用單獨的線程發送到服務端。
- send()方法會返回一個包含RecordMetadata的Future對象,不過由於咱們會忽略返回值,因此沒法知道消息是否發送成功
- 若是不關心發送結果,那麼可使用這種發送方式。
- 三、其餘異常
- 咱們能夠忽略發送消息時可能發生的錯誤或在服務器端可能發生的錯誤,可是在發送消息以前,生產者我仍是有可能發生其餘異常。
- 這些異常有多是SerializationException(序列化失敗)、BufferExhaustedException或TimeoutException(說明緩衝區已經滿了)又或者是InterruptException(說明線程被中斷)
- 同步消息發送
- 一、Producer.send()
- 方法返回一個Future對象,而後調用Future對象的get()方法等待Kafka響應。
- 若是服務器返回作粗,get()方法會跑出異常,
- 若是沒有發生錯誤,咱們會獲得一個RecordMetaData對象,能夠用它獲取消息的偏移量。
- 二、若是在發送數據以前回或者在發送過程當中發生任何需錯誤,記錄下來
- 可重試錯誤:鏈接錯誤:能夠經過再次連接來解決。「無主」錯誤:能夠經過從新爲分區選舉首領來解決。若是屢次重試任然沒法解決,應用程序會收到一個重試異常。
- 沒法經過重試解決:消息太大:對於這類錯誤,KafkaProducer不會進行任何重試,直接拋出異常。
- 異步發送消息
- 若是kafka集羣之間一個來回須要10ms,若是在發送完每一個消息來回須要10ms.若是在發送完每一個消息以後都等待迴應,那麼發送一百個消息就須要1秒。但若是隻發送消息不等待響應,那麼發送100個消息。所須要的時間會少不少
- 大多數的狀況下咱們並不須要等待響應。
- 一、爲了使用回調,須要一個實現了org.apache.kafka.clients.producer.Callback接口的類,這個類只有一個onCompletion方法。
- 二、若是kafka返回一個錯誤,onCompletion方法會拋出一個非空異常。這裏咱們只是簡單的打印出來,可是生產環境應該有更好的處理方式。
- 三、記錄與以前的同樣
- 四、在發送消息時穿進去一個回調對象。
四、生產者配置
- acks:指定了必須有多少個分區副本接收到消息,生產者纔會認爲是消息寫入成功了。這個參數對於消息丟失有重要的影響。
- acks=0:生產者在成功寫入消息以前不會等待任何來自服務器的響應。肯能會致使數據丟失,可是吞吐量高
- acks=1:只要首節點收到消息,生產者就會收到一個來自服務器的成功響應。若是一個沒有收到消息的節點成爲新首領會致使消息丟失。這時候的吞吐量取決是同步發送仍是異步發送分。
- ack=all:只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的響應。這種模式是安全的名單時延遲高。
- buffer.memory 該參數用來設置生產者內存緩衝區的大小,生產者用它緩存要發送到服務器的消息
- 若是應用程序發送消息的速度超過發送到服務器的速度,會致使生產者空間不足
- 這個時候send()方法調用要麼被阻塞,要麼拋異常,取決於如何設置block.on.buffer.full參數。
- compression.type 它指定了消息被髮送給broker以前使用哪種算法進行壓縮,默認不壓縮,該參數能夠設置爲snappy, gzip或lz4。
- snappy壓縮算法是谷歌發明的,它佔用較少的CPU,提供較好的性能和至關可觀的壓縮比,若是比較關注性能和網絡寬帶,可使用這種算法。
- gzip通常使用較多的CPU,可是會提供更高的壓縮比,因此若是網絡寬帶有限,可使用這種算法。
- 使用壓縮能夠下降網絡傳輸開銷和存儲開銷,而這每每是kafka發送消息的瓶頸所在。
- retires:生產者收到的服務器返回的錯誤多是臨時的(好比分區找不到首領),在這種狀況下,這個參數決定了重發的次數。
- 默認狀況下,生產者每次重試之間會等待100ms。能夠經過retry.backoff.ms參數來改變時間間隔。
- 通常狀況下,由於生產者會自動進行重試,因此就不必在代碼邏輯處理那些可重試錯誤,你只須要處理那些不可重試的錯誤,或者重試次數超出上限的狀況。
- batch.size 當多個消息須要被髮送到同一個分區的時候,生產者會把他們放在同一個批次裏。該參數指定了一個批次能夠用的內存大小,按照字節計算(而不是消息個數)
- 當批次被填滿,批次裏面的全部消息會被髮送出去。
- 生產者不必定會等到批次被佔滿才發送數據。
- 咱們通常把這個之設置的很大,也不會形成延遲,只是會多佔用一些內存。
- 若是設置的過小,生產者須要更頻繁的發送消息,會增長一些額外一些開銷。
- linger.ms 該參數指定了生產者在發送批次以前等待更多消息加入批次的時間。
- kafkaProducer會在批次填滿或者linger.ms達到上限時把批次發出去。
- 把linger.ms設置成比0大的數,讓生產者在發送以前等待一下子,使更多的消息加入這個批次,雖然這樣會增長延遲,可是也會提升吞吐量(由於一次性發送更多的消息,每一個消息的開銷就變小了)
- client.id 消息標識,該參數能夠是任何的字符串,服務器用它來識別消息來源,還能夠用在日誌和配額指標裏面。
- max.in.flight.requests.per.connection: 指定了生產者在收到服務器響應以前能夠發送多個消息。
- 值越大,佔用的內存越大
- 值越大,吞吐量越高
- 設置爲1,保證消息順序發送,即便發生了重試。
- timeout.ms、request.timeout.ms、metadata.fetch.timeout.ms
- timeout.ms 指定了broker等待同步副本返回消息確認的時間,與acks的配置相匹配
- 若是指定時間沒收到同步副本確認,broker返回一個錯誤
- request.timeout.ms 生產者在發送數據的時候等待服務器返回響應的時間。
- metadata.fetch.timeout.ms 生產者在獲取元數據(好比目標分區的首領是誰)是等待服務器返回響應的時間。
- max.block.ms 指定了在調用send()方法或者使用partitionFor()方法獲取元數據時生產者阻塞時間。
- 當生產者的發送緩衝區已滿,或沒有可用元數據,這些方法就會阻塞
- 在阻塞時間達到配置的值時,生產者就會跑出異常
- max.request.size 用於控制生產者發送的請求大小。能夠指發送消息的單個大小,也能夠指單個請求裏全部消息的大小。
- receive.buffer.bytes、send.buffer.bytes 分別指定了Tcp socket接收和發送數據的緩衝區大小。
- -1,就是使用系統的默認值
- 若是生產者或消費者與broker處於不一樣的數據中心,那麼能夠適當增大這個值。
歡迎關注本站公眾號,獲取更多信息