kafka系列(07):Kafka 的生產者

生產者發送消息的基本流程

從建立一個 ProducerRecord 對象開始, Producer Record 對象須要包含目標主題和要發送的內容。咱們還能夠指定鍵或分區。在發送 ProducerRecord 對象時,生產者要先把鍵和值對象序列化成字節數組,這樣它們纔可以在網絡上傳輸。html

接下來,數據被傳給分區器。若是以前在 Producer Record 對象裏指定了分區,那麼分區器就不會再作任何事情,直接把指定的分區返回。若是沒有 指定分區,那麼分區器會根據 Producer Record 對象的鍵來選擇一個分區。選好分區之後,生產者就知道該往哪一個主題和分區發送這條記錄了。緊接着, 這條記錄被添加到一個記錄批次裏(雙端隊列,尾部寫入),這個批次裏的全部消息會被髮送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。算法

服務器在收到這些消息時會返回一個響應。若是消息成功寫入 Kafka ,就返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分 區裏的偏移量。若是寫入失敗, 則會返回一個錯誤。生產者在收到錯誤以後會嘗試從新發送消息,幾回以後若是仍是失敗,就返回錯誤信息。spring

生產者發送消息通常會發生兩類錯誤:apache

一類是可重試錯誤,好比鏈接錯誤(可經過再次創建鏈接解決)、無主 no leader(可經過分區從新選舉首領解決)。數組

另外一類是沒法經過重試解決,好比「消息太大」異常,具體見 message.max.bytes,這類消息不會進行任何重試,直接拋出異常緩存

使用 Kafka 生產者

三種發送方式安全

咱們經過生成者的 send 方法進行發送。send 方法會返回一個包含 RecordMetadata 的 Future 對象。RecordMetadata 裏包含了目標主題,分區信息和 消息的偏移量。服務器

發送並忘記網絡

忽略 send 方法的返回值,不作任何處理。大多數狀況下,消息會正常到達,並且生產者會自動重試,但有時會丟失消息。多線程

同步發送

得到 send 方法返回的 Future 對象,在合適的時候調用 Future 的 get 方法。參見代碼,模塊 kafka-no-spring

異步發送

實現接口 org.apache.kafka.clients.producer.Callback,而後將實現類的實例做爲參數傳遞給 send 方法。參見代碼,模塊 kafka-no-spring 下包 sendtype 中。

多線程下的生產者

KafkaProducer 的實現是線程安全的,因此咱們能夠在多線程的環境下,安全的使用 KafkaProducer 的實例,如何節約資源的使用呢?參見代碼,模塊 kafka-no-spring 下包 concurrent

 

更多發送配置

生產者有不少屬性能夠設置,大部分都有合理的默認值,無需調整。有些參數可能對內存使用,性能和可靠性方面有較大影響。能夠參考 org.apache.kafka.clients.producer 包下的 ProducerConfig 類。代碼見模塊 kafka-no-spring 下包 ProducerConfig 中 ConfigKafkaProducer 類

acks:

Kafk 內部的複製機制是比較複雜的,這裏不談論內部機制(後續章節進行細講),咱們只討論生產者發送消息時與副本的關係。

指定了必需要有多少個分區副本收到消息,生產者纔會認爲寫入消息是成功的,這個參數對消息丟失的可能性有重大影響。

acks=0:生產者在寫入消息以前不會等待任 何來自服務器的響應,容易丟消息,可是吞吐量高。

acks=1:只要集羣的首領節點收到消息,生產者會收到來自服務器的成功響應。若是消息沒法到達首領節點(好比首領節點崩潰,新首領沒有選舉出 來),生產者會收到一個錯誤響應,爲了不數據丟失,生產者會重發消息。不過,若是一個沒有收到消息的節點成爲新首領,消息仍是會丟失。默認 使用這個配置。

acks=all:只有當全部參與複製的節點都收到消息,生產者纔會收到一個來自服務器的成功響應。延遲高。

金融業務,主備外加異地災備。因此不少高可用場景通常不是設置 2 個副本,有可能達到 5 個副本,不一樣機架上部署不一樣的副本,異地上也部署一 套副本。

 

buffer.memory

設置生產者內存緩衝區的大小(結合生產者發送消息的基本流程),生產者用它緩衝要發送到服務器的消息。若是數據產生速度大於向 broker 發送 的速度,致使生產者空間不足,producer 會阻塞或者拋出異常。缺省 33554432 (32M)

max.block.ms

指定了在調用 send()方法或者使用 partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些 方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。缺省 60000ms

retries

發送失敗時,指定生產者能夠重發消息的次數(缺省 Integer.MAX_VALUE)。默認狀況下,生產者在每次重試之間等待 100ms,能夠經過參數 retry.backoff.ms 參數來改變這個時間間隔。

receive.buffer.bytes 和 send.buffer.bytes

指定 TCP socket 接受和發送數據包的緩存區大小。若是它們被設置爲-1,則使用操做系統的默認值。若是生產者或消費者處在不一樣的數據中心,那麼 能夠適當增大這些值,由於跨數據中心的網絡通常都有比較高的延遲和比較低的帶寬。缺省 102400

batch.size

當多個消息被髮送同一個分區時,生產者會把它們放在同一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算。當批次內存 被填滿後,批次裏的全部消息會被髮送出去。可是生產者不必定都會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也有可能被髮送。缺省 16384(16k) ,若是一條消息超過了批次的大小,會寫不進去。

linger.ms

指定了生產者在發送批次前等待更多消息加入批次的時間。它和 batch.size 以先到者爲先。也就是說,一旦咱們得到消息的數量夠 batch.size 的數量 了,他將會當即發送而不顧這項設置,然而若是咱們得到消息字節數比 batch.size 設置要小的多,咱們須要「linger」特定的時間以獲取更多的消息。這個設 置默認爲 0,即沒有延遲。設定 linger.ms=5,例如,將會減小請求數目,可是同時會增長 5ms 的延遲,但也會提高消息的吞吐量。

compression.type

producer 用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是 none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越 好。snappy 佔用 cpu 少,提供較好的性能和可觀的壓縮比,若是比較關注性能和網絡帶寬,用這個。若是帶寬緊張,用 gzip,會佔用較多的 cpu,但提供 更高的壓縮比。

client.id

當向 server 發出請求時,這個字符串會發送給 server。目的是可以追蹤請求源頭,以此來容許 ip/port 許可列表以外的一些應用能夠發送信息。這項 應用能夠設置任意字符串,由於沒有任何功能性的目的,除了記錄和跟蹤。

max.in.flight.requests.per.connection

指定了生產者在接收到服務器響應以前能夠發送多個消息,值越高,佔用的內存越大,固然也能夠提高吞吐量。發生錯誤時,可能會形成數據的發 送順序改變,默認是 5 (修改)。 若是須要保證消息在一個分區上的嚴格順序,這個值應該設爲 1。不過這樣會嚴重影響生產者的吞吐量。

request.timeout.ms

客戶端將等待請求的響應的最大時間,若是在這個時間內沒有收到響應,客戶端將重發請求;超太重試次數將拋異常,默認 30 秒。

metadata.fetch.timeout.ms

是指咱們所獲取的一些元數據的第一個時間數據。元數據包含:topic,host,partitions。此項配置是指當等待元數據 fetch 成功完成所須要的時間, 不然會跑出異常給客戶端

max.request.size

控制生產者發送請求最大大小。默認這個值爲 1M,若是一個請求裏只有一個消息,那這個消息不能大於 1M,若是一次請求是一個批次,該批次包 含了 1000 條消息,那麼每一個消息不能大於 1KB。注意:broker 具備本身對消息記錄尺寸的覆蓋,若是這個尺寸小於生產者的這個設置,會致使消息被拒 絕。這個參數和 Kafka 主機的 message.max.bytes 參數有關係。若是生產者發送的消息超過 message.max.bytes 設置的大小,就會被 Kafka 服務器拒絕。

以上參數不用去,通常來講,就記住 acks、batch.size、linger.ms、max.request.size 就好了,由於這 4 個參數重要些,其餘參數通常沒有太大必要調整。

 

順序保證

Kafka 能夠保證同一個分區裏的消息是有序的。也就是說,發送消息時,主題只有且只有一個分區,同時生產者按照必定的順序發送消息, broker 就 會按照這個順序把它們寫入分區,消費者也會按照一樣的順序讀取它們。在某些狀況下, 順序是很是重要的。例如,往一個帳戶存入 100 元再取出來, 這個與先取錢再存錢是大相徑庭的!不過,有些場景對順序不是很敏感。

 

 

若是把 retires 設爲非零整數,同時把 max.in.flight.requests.per.connection 設爲比 1 大的數,那麼,若是第一個批次消息寫入失敗,而第二個批次寫 入成功, broker 會重試寫入第一個批次。若是此時第一個批次也寫入成功,那麼兩個批次的順序就反過來了。

通常來講,若是某些場景要求消息是有序的,那麼消息是否寫入成功也是很關鍵的,因此不建議把 retires 設爲 0(不重試的話消息可能會由於鏈接關 閉等緣由會丟) 。因此仍是須要重試,同時把 max.in.flight.request.per.connection 設爲 1,這樣在生產者嘗試發送第一批消息時,就不會有其餘的消息發 送給 broker 。不過這樣會嚴重影響生產者的吞吐量,因此只有在對消息的順序有嚴格要求的狀況下才能這麼作

序列化

建立生產者對象必須指定序列化器,默認的序列化器並不能知足咱們全部的場景。咱們徹底能夠自定義序列化器。只要實現 org.apache.kafka.common.serialization.Serializer 接口便可。

自定義序列化須要考慮的問題

自定義序列化容易致使程序的脆弱性。舉例,在咱們上面的實現裏,咱們有多種類型的消費者,每一個消費者對實體字段都有各自的需求,好比,有 的將字段變動爲 long 型,有的會增長字段,這樣會出現新舊消息的兼容性問題。特別是在系統升級的時候,常常會出現一部分系統升級,其他系統被迫 跟着升級的狀況。

解決這個問題,能夠考慮使用自帶格式描述以及語言無關的序列化框架。好比 Protobuf,或者 Kafka 官方推薦的 Apache Avro。

Avro 會使用一個 JSON 文件做爲 schema 來描述數據,Avro 在讀寫時會用到這個 schema,能夠把這個 schema 內嵌在數據文件中。這樣,無論數據格 式如何變更,消費者都知道如何處理數據。

可是內嵌的消息,自帶格式,會致使消息的大小沒必要要的增大,消耗了資源。咱們可使用 schema 註冊表機制,將全部寫入的數據用到的 schema 保存在註冊表中,而後在消息中引用 schema 的標識符,而讀取的數據的消費者程序使用這個標識符從註冊表中拉取 schema 來反序列化記錄。

注意:Kafka 自己並不提供 schema 註冊表,須要藉助第三方,如今已經有不少的開源實現,好比 Confluent Schema Registry,能夠從 GitHub 上獲取。

如何使用參考以下網址:

https://cloud.tencent.com/developer/article/1336568

不過通常除非你使用 Kafka 須要關聯的團隊比較大,敏捷開發團隊纔會使用,通常的團隊用不上。對於通常的狀況使用 JSON 足夠了。

分區

咱們在新增 ProducerRecord 對象中能夠看到,ProducerRecord 包含了目標主題,鍵和值,Kafka 的消息都是一個個的鍵值對。鍵能夠設置爲默認的 null。鍵的主要用途有兩個:一,用來決定消息被寫往主題的哪一個分區,擁有相同鍵的消息將被寫往同一個分區,二,還能夠做爲消息的附加消息。

若是鍵值爲 null,而且使用默認的分區器,分區器使用輪詢算法將消息均衡地分佈到各個分區上。

若是鍵不爲空,而且使用默認的分區器,Kafka 對鍵進行散列(Kafka 自定義的散列算法,具體算法原理不知),而後根據散列值把消息映射到特定 的分區上。很明顯,同一個鍵老是被映射到同一個分區。可是隻有不改變主題分區數量的狀況下,鍵和分區之間的映射才能保持不變,一旦增長了新的 分區,就沒法保證了,因此若是要使用鍵來映射分區,那就要在建立主題的時候把分區規劃好,並且永遠不要增長新分區

自定義分區器

某些狀況下,數據特性決定了須要進行特殊分區,好比電商業務,北京的業務量明顯比較大,佔據了總業務量的 20%,咱們須要對北京的訂單進行 單獨分區處理,默認的散列分區算法不合適了, 咱們就能夠自定義分區算法,對北京的訂單單獨處理,其餘地區沿用散列分區算法。或者某些狀況下, 咱們用 value 來進行分區。 具體實現,先建立一個 4 分區的主題,而後觀察模塊 kafka-no-spring 下包 SelfPartitionProducer 中代碼。

相關文章
相關標籤/搜索