kafka發送超大消息設置

最近開發一cdc應用,爲了測試極端狀況,須要kafka傳遞100萬條數據過去,1個G左右,因爲其餘環節限制,不便進行拆包(注:測下來,大包走kafka不必定性能更好,甚至可能更低)。html

測試百萬以上的變動數據時,報消息超過kafka broker容許的最大值,所以須要修改以下參數,保證包可以正常發送:算法

socket.request.max.bytes=2147483647    # 設置了socket server接收的最大請求大小
log.segment.bytes=2147483647              # kafka數據文件的大小,確保這個數值大於一個消息的長度。通常說來使用默認值便可(通常一個消息很難大於1G,由於這是一個消息系統,而不是文件系統)。
message.max.bytes=2147483647             # 設置了kafka server接收的最大消息大小,應小於等於socket.request.max.bytes
replica.fetch.max.bytes=2147483647         #每一個分區試圖獲取的消息字節數。要大於等於message.max.bytes,不然broker會接收此消息,但沒法將此消息複製出去,從而形成數據丟失。
fetch.message.max.bytes=2147483647      #每一個提取請求中爲每一個主題分區提取的消息字節數。要大於等於message.max.bytes,不然broker就會由於消費端沒法使用這個消息而掛起。apache

生產者能夠以下設定:服務器

kafkaProps.put("max.request.size", 2147483647);    # 要小於 message.max.bytes,也能夠設置在producer.properties配置文件中
kafkaProps.put("buffer.memory", 2147483647);
kafkaProps.put("timeout.ms", 3000000);
kafkaProps.put("request.timeout.ms", 30000000);

消費者設定以下:session

props.put("request.timeout.ms", 30000000);
props.put("session.timeout.ms", "3000000");
props.put("fetch.max.wait.ms", "3000000");

各參數的含義能夠參考kafka官方文檔https://kafka.apache.org/documentation/#configuration。app

kafka基礎知識體系,請參考LZ學習筆記kafka學習指南(總結版)socket

注,各參數對內存的影響以下:Brokers會爲每一個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則須要差很少1G的內存,確保 分區數*最大的消息不會超過服務器的內存,不然會報OOM錯誤。一樣地,消費端的fetch.message.max.bytes指定了最大消息須要的內存空間,一樣,分區數*最大須要內存空間 不能超過服務器的內存。因此,若是你有大的消息要傳送,則在內存必定的狀況下,只能使用較少的分區數或者使用更大內存的服務器。性能

  雖然上面的方法能夠奏效,可是並不推薦。Kafka設計的初衷是迅速處理短小的消息,通常10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,咱們須要處理更大的消息,好比XML文檔或JSON內容,一個消息差很少有10-100M,這種狀況下,Kakfa應該如何處理?學習

針對這個問題,有如下幾個建議:測試

  1.   最好的方法是不直接傳送這些大的數據。若是有共享存儲,如NAS, HDFS, S3等,能夠把這些大的文件存放到共享存儲,而後使用Kafka來傳送文件的位置信息。
  2.   第二個方法是,將大的消息數據切片或切塊,在生產端將數據切片爲10K大小,使用分區主鍵確保一個大消息的全部部分會被髮送到同一個kafka分區(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分從新還原爲原始的消息。
  3.   第三,Kafka的生產端能夠壓縮消息,若是原始消息是XML,當經過壓縮以後,消息可能會變得不那麼大。在生產端的配置參數中使用compression.codec和commpressed.topics能夠開啓壓縮功能,壓縮算法可使用GZip或Snappy。

上面這些值太大還會形成一個問題,就是消息沒有在指定時間內(max.poll.interval.ms(默認300秒))消費完,致使被rebalance,而kafka自己有個bug(服務器端的rebalance.timeout.ms(默認60秒)不生效),這會致使消費者組的rebalance時間比較長,因此這是須要注意的,參見https://blog.csdn.net/u013200380/article/details/87868696。

相關文章
相關標籤/搜索